Refactor the math engine to compile the query and use eval

This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.

It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.

This also makes use of the new eval functionality that was recently
added to the influxql package.

No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.

This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:

    SELECT max(value) * 2, max(value) / 2 FROM cpu

This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
pull/9563/head
Jonathan A. Sternberg 2018-03-19 12:05:55 -05:00
parent c720b3b40c
commit f8d60a881d
16 changed files with 1224 additions and 2905 deletions

View File

@ -1,6 +1,10 @@
v1.6.0 [unreleased]
-------------------
### Breaking changes
- If math is used with the same selector multiple times, it will now act as a selector rather than an aggregate. See [#9563](https://github.com/influxdata/influxdb/pull/9563) for details.
### Features
- [#9429](https://github.com/influxdata/influxdb/pull/9429): Support proxy environment variables in the influx client.

View File

@ -95,13 +95,13 @@ func newCompiler(opt CompileOptions) *compiledStatement {
func Compile(stmt *influxql.SelectStatement, opt CompileOptions) (Statement, error) {
c := newCompiler(opt)
if err := c.preprocess(stmt); err != nil {
return nil, err
}
if err := c.compile(stmt); err != nil {
return nil, err
}
c.stmt = stmt.Clone()
if err := c.preprocess(c.stmt); err != nil {
return nil, err
}
if err := c.compile(c.stmt); err != nil {
return nil, err
}
c.stmt.TimeAlias = c.TimeFieldName
c.stmt.Condition = c.Condition
@ -127,6 +127,10 @@ func (c *compiledStatement) preprocess(stmt *influxql.SelectStatement) error {
if err != nil {
return err
}
// Verify that the condition is actually ok to use.
if err := c.validateCondition(cond); err != nil {
return err
}
c.Condition = cond
c.TimeRange = t
@ -170,6 +174,7 @@ func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error {
for _, source := range stmt.Sources {
switch source := source.(type) {
case *influxql.SubQuery:
source.Statement.OmitTime = true
if err := c.subquery(source.Statement); err != nil {
return err
}
@ -179,6 +184,8 @@ func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error {
}
func (c *compiledStatement) compileFields(stmt *influxql.SelectStatement) error {
valuer := MathValuer{}
c.Fields = make([]*compiledField, 0, len(stmt.Fields))
for _, f := range stmt.Fields {
// Remove any time selection (it is automatically selected by default)
@ -194,12 +201,10 @@ func (c *compiledStatement) compileFields(stmt *influxql.SelectStatement) error
}
// Append this field to the list of processed fields and compile it.
f.Expr = influxql.Reduce(f.Expr, &valuer)
field := &compiledField{
global: c,
Field: &influxql.Field{
Expr: influxql.Reduce(f.Expr, nil),
Alias: f.Alias,
},
global: c,
Field: f,
AllowWildcard: true,
}
c.Fields = append(c.Fields, field)
@ -244,6 +249,11 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error {
c.global.HasAuxiliaryFields = true
return nil
case *influxql.Call:
if isMathFunction(expr) {
// TODO(jsternberg): Implement validation for any math functions.
return nil
}
// Register the function call in the list of function calls.
c.global.FunctionCalls = append(c.global.FunctionCalls, expr)
@ -305,6 +315,8 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error {
}
case *influxql.ParenExpr:
return c.compileExpr(expr.Expr)
case influxql.Literal:
return errors.New("field must contain at least one variable")
}
return errors.New("unimplemented")
}
@ -752,6 +764,35 @@ func (c *compiledStatement) validateFields() error {
return nil
}
// validateCondition verifies that all elements in the condition are appropriate.
// For example, aggregate calls don't work in the condition and should throw an
// error as an invalid expression.
func (c *compiledStatement) validateCondition(expr influxql.Expr) error {
switch expr := expr.(type) {
case *influxql.BinaryExpr:
// Verify each side of the binary expression. We do not need to
// verify the binary expression itself since that should have been
// done by influxql.ConditionExpr.
if err := c.validateCondition(expr.LHS); err != nil {
return err
}
if err := c.validateCondition(expr.RHS); err != nil {
return err
}
return nil
case *influxql.Call:
if !isMathFunction(expr) {
return fmt.Errorf("invalid function call in condition: %s", expr)
}
if exp, got := 1, len(expr.Args); exp != got {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}
return c.validateCondition(expr.Args[0])
default:
return nil
}
}
// subquery compiles and validates a compiled statement for the subquery using
// this compiledStatement as the parent.
func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error {
@ -763,8 +804,11 @@ func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error {
// Substitute now() into the subquery condition. Then use ConditionExpr to
// validate the expression. Do not store the results. We have no way to store
// and read those results at the moment.
valuer := influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location}
stmt.Condition = influxql.Reduce(stmt.Condition, &valuer)
valuer := influxql.MultiValuer(
&influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location},
&MathValuer{},
)
stmt.Condition = influxql.Reduce(stmt.Condition, valuer)
// If the ordering is different and the sort field was specified for the subquery,
// throw an error.
@ -843,6 +887,12 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions)
return nil, err
}
// Validate if the types are correct now that they have been assigned.
if err := validateTypes(stmt); err != nil {
shards.Close()
return nil, err
}
// Determine base options for iterators.
opt, err := newIteratorOptionsStmt(stmt, sopt)
if err != nil {

View File

@ -1,7 +1,6 @@
package query
import (
"fmt"
"time"
"github.com/influxdata/influxql"
@ -73,6 +72,9 @@ type Cursor interface {
// the previous values will be overwritten while using the same memory.
Scan(row *Row) bool
// Stats returns the IteratorStats from the underlying iterators.
Stats() IteratorStats
// Err returns any errors that were encountered from scanning the rows.
Err() error
@ -83,188 +85,6 @@ type Cursor interface {
Close() error
}
type cursor struct {
buf []Point
itrs []Iterator
ascending bool
series Series
err error
columns []influxql.VarRef
omitTime bool
loc *time.Location
}
func newCursor(itrs []Iterator, columns []influxql.VarRef, ascending bool) *cursor {
return &cursor{
buf: make([]Point, len(itrs)),
itrs: itrs,
ascending: ascending,
columns: columns,
loc: time.UTC,
}
}
func (cur *cursor) Scan(row *Row) bool {
// Immediately end the scan if there are no iterators.
if len(cur.itrs) == 0 {
return false
}
t, name, tags, err := cur.loadBuf()
if err != nil {
cur.err = err
return false
}
if t == ZeroTime {
return false
}
row.Time = t
// Check to see if the name or tags have changed.
// If they have, copy them over and update the series id.
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
cur.series.id++
cur.series.Name = name
cur.series.Tags = tags
}
row.Series = cur.series
// If the row values is not large enough, allocate a slice.
if len(row.Values) < len(cur.columns) {
row.Values = make([]interface{}, len(cur.columns))
}
if !cur.omitTime {
row.Values[0] = time.Unix(0, t).In(cur.loc)
cur.readInto(t, name, tags, row.Values[1:])
} else {
cur.readInto(t, name, tags, row.Values)
}
return true
}
// loadBuf reads in points into empty buffer slots.
// Returns the next time/name/tags to emit for.
func (cur *cursor) loadBuf() (t int64, name string, tags Tags, err error) {
t = ZeroTime
for i := range cur.itrs {
// Load buffer, if empty.
if cur.buf[i] == nil {
cur.buf[i], err = cur.readIterator(cur.itrs[i])
if err != nil {
break
}
}
// Skip if buffer is empty.
p := cur.buf[i]
if p == nil {
continue
}
itrTime, itrName, itrTags := p.time(), p.name(), p.tags()
// Initialize range values if not set.
if t == ZeroTime {
t, name, tags = itrTime, itrName, itrTags
continue
}
// Update range values if lower and emitter is in time ascending order.
if cur.ascending {
if (itrName < name) || (itrName == name && itrTags.ID() < tags.ID()) || (itrName == name && itrTags.ID() == tags.ID() && itrTime < t) {
t, name, tags = itrTime, itrName, itrTags
}
continue
}
// Update range values if higher and emitter is in time descending order.
if (itrName > name) || (itrName == name && itrTags.ID() > tags.ID()) || (itrName == name && itrTags.ID() == tags.ID() && itrTime > t) {
t, name, tags = itrTime, itrName, itrTags
}
}
return
}
func (cur *cursor) readInto(t int64, name string, tags Tags, values []interface{}) {
for i, p := range cur.buf {
// Skip if buffer is empty.
if p == nil {
values[i] = nil
continue
}
// Skip point if it doesn't match time/name/tags.
pTags := p.tags()
if p.time() != t || p.name() != name || !pTags.Equals(&tags) {
values[i] = nil
continue
}
// Read point value.
values[i] = p.value()
// Clear buffer.
cur.buf[i] = nil
}
}
// readIterator reads the next point from itr.
func (cur *cursor) readIterator(itr Iterator) (Point, error) {
if itr == nil {
return nil, nil
}
switch itr := itr.(type) {
case FloatIterator:
if p, err := itr.Next(); err != nil {
return nil, err
} else if p != nil {
return p, nil
}
case IntegerIterator:
if p, err := itr.Next(); err != nil {
return nil, err
} else if p != nil {
return p, nil
}
case UnsignedIterator:
if p, err := itr.Next(); err != nil {
return nil, err
} else if p != nil {
return p, nil
}
case StringIterator:
if p, err := itr.Next(); err != nil {
return nil, err
} else if p != nil {
return p, nil
}
case BooleanIterator:
if p, err := itr.Next(); err != nil {
return nil, err
} else if p != nil {
return p, nil
}
default:
panic(fmt.Sprintf("unsupported iterator: %T", itr))
}
return nil, nil
}
func (cur *cursor) Err() error {
return cur.err
}
func (cur *cursor) Columns() []influxql.VarRef {
return cur.columns
}
func (cur *cursor) Close() error {
return Iterators(cur.itrs).Close()
}
// RowCursor returns a Cursor that iterates over Rows.
func RowCursor(rows []Row, columns []influxql.VarRef) Cursor {
return &rowCursor{
@ -295,6 +115,10 @@ func (cur *rowCursor) Scan(row *Row) bool {
return true
}
func (cur *rowCursor) Stats() IteratorStats {
return IteratorStats{}
}
func (cur *rowCursor) Err() error {
return nil
}
@ -307,6 +131,195 @@ func (cur *rowCursor) Close() error {
return nil
}
type scannerFunc func(m map[string]interface{}) (int64, string, Tags)
type scannerCursorBase struct {
fields []influxql.Expr
m map[string]interface{}
series Series
columns []influxql.VarRef
loc *time.Location
scan scannerFunc
}
func newScannerCursorBase(scan scannerFunc, fields []*influxql.Field, loc *time.Location) scannerCursorBase {
typmap := FunctionTypeMapper{}
exprs := make([]influxql.Expr, len(fields))
columns := make([]influxql.VarRef, len(fields))
for i, f := range fields {
exprs[i] = f.Expr
columns[i] = influxql.VarRef{
Val: f.Name(),
Type: influxql.EvalType(f.Expr, nil, typmap),
}
}
if loc == nil {
loc = time.UTC
}
return scannerCursorBase{
fields: exprs,
m: make(map[string]interface{}),
columns: columns,
loc: loc,
scan: scan,
}
}
func (cur *scannerCursorBase) Scan(row *Row) bool {
ts, name, tags := cur.scan(cur.m)
if ts == ZeroTime {
return false
}
row.Time = ts
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
cur.series.Name = name
cur.series.Tags = tags
cur.series.id++
}
row.Series = cur.series
if len(cur.columns) > len(row.Values) {
row.Values = make([]interface{}, len(cur.columns))
}
valuer := influxql.ValuerEval{
Valuer: &MathValuer{
Valuer: influxql.MapValuer(cur.m),
},
IntegerFloatDivision: true,
}
for i, expr := range cur.fields {
// A special case if the field is time to reduce memory allocations.
if ref, ok := expr.(*influxql.VarRef); ok && ref.Val == "time" {
row.Values[i] = time.Unix(0, row.Time).In(cur.loc)
continue
}
row.Values[i] = valuer.Eval(expr)
}
return true
}
func (cur *scannerCursorBase) Columns() []influxql.VarRef {
return cur.columns
}
var _ Cursor = (*scannerCursor)(nil)
type scannerCursor struct {
scanner IteratorScanner
scannerCursorBase
}
func newScannerCursor(s IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *scannerCursor {
cur := &scannerCursor{scanner: s}
cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location)
return cur
}
func (s *scannerCursor) scan(m map[string]interface{}) (int64, string, Tags) {
ts, name, tags := s.scanner.Peek()
if ts == ZeroTime {
return ts, name, tags
}
s.scanner.ScanAt(ts, name, tags, m)
return ts, name, tags
}
func (cur *scannerCursor) Stats() IteratorStats {
return cur.scanner.Stats()
}
func (cur *scannerCursor) Err() error {
return cur.scanner.Err()
}
func (cur *scannerCursor) Close() error {
return cur.scanner.Close()
}
var _ Cursor = (*multiScannerCursor)(nil)
type multiScannerCursor struct {
scanners []IteratorScanner
err error
ascending bool
scannerCursorBase
}
func newMultiScannerCursor(scanners []IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *multiScannerCursor {
cur := &multiScannerCursor{
scanners: scanners,
ascending: opt.Ascending,
}
cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location)
return cur
}
func (cur *multiScannerCursor) scan(m map[string]interface{}) (ts int64, name string, tags Tags) {
ts = ZeroTime
for _, s := range cur.scanners {
curTime, curName, curTags := s.Peek()
if curTime == ZeroTime {
if err := s.Err(); err != nil {
cur.err = err
return ZeroTime, "", Tags{}
}
continue
}
if ts == ZeroTime {
ts, name, tags = curTime, curName, curTags
continue
}
if cur.ascending {
if (curName < name) || (curName == name && curTags.ID() < tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime < ts) {
ts, name, tags = curTime, curName, curTags
}
continue
}
if (curName > name) || (curName == name && curTags.ID() > tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime > ts) {
ts, name, tags = curTime, curName, curTags
}
}
if ts == ZeroTime {
return ts, name, tags
}
for _, s := range cur.scanners {
s.ScanAt(ts, name, tags, m)
}
return ts, name, tags
}
func (cur *multiScannerCursor) Stats() IteratorStats {
var stats IteratorStats
for _, s := range cur.scanners {
stats.Add(s.Stats())
}
return stats
}
func (cur *multiScannerCursor) Err() error {
return cur.err
}
func (cur *multiScannerCursor) Close() error {
var err error
for _, s := range cur.scanners {
if e := s.Close(); e != nil && err == nil {
err = e
}
}
return err
}
// DrainCursor will read and discard all values from a Cursor and return the error
// if one happens.
func DrainCursor(cur Cursor) error {

View File

@ -152,14 +152,6 @@ func NewContextWithIterators(ctx context.Context, itr *Iterators) context.Contex
return context.WithValue(ctx, iteratorsContextKey, itr)
}
// tryAddAuxIteratorToContext will capture itr in the *Iterators slice, when configured
// with a call to NewContextWithIterators.
func tryAddAuxIteratorToContext(ctx context.Context, itr AuxIterator) {
if v, ok := ctx.Value(iteratorsContextKey).(*Iterators); ok {
*v = append(*v, itr)
}
}
// StatementExecutor executes a statement within the Executor.
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the

View File

@ -31,20 +31,49 @@ func (m FieldMapper) CallType(name string, args []influxql.DataType) (influxql.D
return typmap.CallType(name, args)
}
type FunctionTypeMapper struct{}
// CallTypeMapper returns the types for call iterator functions.
// Call iterator functions are commonly implemented within the storage engine
// so this mapper is limited to only the return values of those functions.
type CallTypeMapper struct{}
func (CallTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType {
return influxql.Unknown
}
func (CallTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) {
// If the function is not implemented by the embedded field mapper, then
// see if we implement the function and return the type here.
switch name {
case "mean":
return influxql.Float, nil
case "count":
return influxql.Integer, nil
case "min", "max", "sum", "first", "last":
// TODO(jsternberg): Verify the input type.
return args[0], nil
}
return influxql.Unknown, nil
}
// FunctionTypeMapper handles the type mapping for all functions implemented by the
// query engine.
type FunctionTypeMapper struct {
CallTypeMapper
}
func (FunctionTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType {
return influxql.Unknown
}
func (FunctionTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) {
// If the function is not implemented by the embedded field mapper, then
// see if we implement the function and return the type here.
func (m FunctionTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) {
if typ, err := m.CallTypeMapper.CallType(name, args); typ != influxql.Unknown || err != nil {
return typ, err
}
// Handle functions implemented by the query engine.
switch name {
case "mean", "median", "integral", "stddev":
case "median", "integral", "stddev":
return influxql.Float, nil
case "count":
return influxql.Integer, nil
case "elapsed":
return influxql.Integer, nil
default:

File diff suppressed because it is too large Load Diff

View File

@ -490,6 +490,95 @@ type {{$k.name}}SortedMergeHeapItem struct {
itr {{$k.Name}}Iterator
}
// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map.
type {{$k.name}}IteratorScanner struct {
input *buf{{$k.Name}}Iterator
err error
keys []string
defaultValue interface{}
}
// new{{$k.Name}}IteratorScanner creates a new IteratorScanner.
func new{{$k.Name}}IteratorScanner(input {{$k.Name}}Iterator, keys []string, defaultValue interface{}) *{{$k.name}}IteratorScanner {
return &{{$k.name}}IteratorScanner{
input: newBuf{{$k.Name}}Iterator(input),
keys: keys,
defaultValue: defaultValue,
}
}
func (s *{{$k.name}}IteratorScanner) Peek() (int64, string, Tags) {
if s.err != nil {
return ZeroTime, "", Tags{}
}
p, err := s.input.peek()
if err != nil {
s.err = err
return ZeroTime, "", Tags{}
} else if p == nil {
return ZeroTime, "", Tags{}
}
return p.Time, p.Name, p.Tags
}
func (s *{{$k.name}}IteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) {
if s.err != nil {
return
}
p, err := s.input.Next()
if err != nil {
s.err = err
return
} else if p == nil {
s.useDefaults(m)
return
} else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) {
s.useDefaults(m)
s.input.unread(p)
return
}
if k := s.keys[0]; k != "" {
if p.Nil {
if s.defaultValue != SkipDefault {
m[k] = s.defaultValue
}
} else {
m[k] = p.Value
}
}
for i, v := range p.Aux {
k := s.keys[i+1]
switch v.(type) {
case float64, int64, uint64, string, bool:
m[k] = v
default:
// Insert the fill value if one was specified.
if s.defaultValue != SkipDefault {
m[k] = s.defaultValue
}
}
}
}
func (s *{{$k.name}}IteratorScanner) useDefaults(m map[string]interface{}) {
if s.defaultValue == SkipDefault {
return
}
for _, k := range s.keys {
if k == "" {
continue
}
m[k] = s.defaultValue
}
}
func (s *{{$k.name}}IteratorScanner) Stats() IteratorStats { return s.input.Stats() }
func (s *{{$k.name}}IteratorScanner) Err() error { return s.err }
func (s *{{$k.name}}IteratorScanner) Close() error { return s.input.Close() }
// {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine.
type {{$k.name}}ParallelIterator struct {
input {{$k.Name}}Iterator
@ -906,170 +995,6 @@ func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error)
return p, nil
}
// aux{{$k.Name}}Point represents a combination of a point and an error for the AuxIterator.
type aux{{$k.Name}}Point struct {
point *{{$k.Name}}Point
err error
}
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
output chan aux{{$k.Name}}Point
fields *auxIteratorFields
background bool
closer sync.Once
}
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}AuxIterator {
return &{{$k.name}}AuxIterator{
input: newBuf{{$k.Name}}Iterator(input),
output: make(chan aux{{$k.Name}}Point, 1),
fields: newAuxIteratorFields(opt),
}
}
func (itr *{{$k.name}}AuxIterator) Background() {
itr.background = true
itr.Start()
go DrainIterator(itr)
}
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *{{$k.name}}AuxIterator) Close() error {
var err error
itr.closer.Do(func() { err = itr.input.Close() })
return err
}
func (itr *{{$k.name}}AuxIterator) Next() (*{{$k.Name}}Point, error) {
p := <-itr.output
return p.point, p.err
}
func (itr *{{$k.name}}AuxIterator) Iterator(name string, typ influxql.DataType) Iterator { return itr.fields.iterator(name, typ) }
func (itr *{{.name}}AuxIterator) stream() {
for {
// Read next point.
p, err := itr.input.Next()
if err != nil {
itr.output <- aux{{$k.Name}}Point{err: err}
itr.fields.sendError(err)
break
} else if p == nil {
break
}
// Send point to output and to each field iterator.
itr.output <- aux{{$k.Name}}Point{point: p}
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
itr.fields.close()
}
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
type {{$k.name}}ChanIterator struct {
buf struct {
i int
filled bool
points [2]{{$k.Name}}Point
}
err error
cond *sync.Cond
done bool
}
func (itr *{{$k.name}}ChanIterator) Stats() IteratorStats { return IteratorStats{} }
func (itr *{{$k.name}}ChanIterator) Close() error {
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return nil
}
func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf.filled {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case {{$k.Type}}:
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v}
{{if eq $k.Name "Float"}}
case int64:
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
{{end}}
default:
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true}
}
itr.buf.filled = true
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *{{$k.name}}ChanIterator) setErr(err error) {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
itr.err = err
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
}
func (itr *{{$k.name}}ChanIterator) Next() (*{{$k.Name}}Point, error) {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Check for an error and return one if there.
if itr.err != nil {
return nil, itr.err
}
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && !itr.buf.filled {
itr.cond.Wait()
}
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
return nil, nil
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
itr.cond.Signal()
return p, nil
}
{{range $v := $types}}
// {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result.
@ -1620,13 +1545,7 @@ func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) {
}
func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
if cur, ok := itr.cur.(*cursor); ok {
for _, itr := range cur.itrs {
stats.Add(itr.Stats())
}
}
return stats
return itr.cur.Stats()
}
func (itr *{{$k.name}}IteratorMapper) Close() error {

View File

@ -7,7 +7,6 @@ import (
"fmt"
"io"
"regexp"
"sync"
"time"
"github.com/gogo/protobuf/proto"
@ -404,175 +403,43 @@ func NewCloseInterruptIterator(input Iterator, closing <-chan struct{}) Iterator
}
}
// AuxIterator represents an iterator that can split off separate auxiliary iterators.
type AuxIterator interface {
Iterator
// IteratorScanner is used to scan the results of an iterator into a map.
type IteratorScanner interface {
// Peek retrieves information about the next point. It returns a timestamp, the name, and the tags.
Peek() (int64, string, Tags)
// Auxilary iterator
Iterator(name string, typ influxql.DataType) Iterator
// ScanAt will take a time, name, and tags and scan the point that matches those into the map.
ScanAt(ts int64, name string, tags Tags, values map[string]interface{})
// Start starts writing to the created iterators.
Start()
// Stats returns the IteratorStats from the Iterator.
Stats() IteratorStats
// Backgrounds the iterator so that, when start is called, it will
// continuously read from the iterator.
Background()
// Err returns an error that was encountered while scanning.
Err() error
io.Closer
}
// NewAuxIterator returns a new instance of AuxIterator.
func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator {
// SkipDefault is a sentinel value to tell the IteratorScanner to skip setting the
// default value if none was present. This causes the map to use the previous value
// if it was previously set.
var SkipDefault = interface{}(0)
// NewIteratorScanner produces an IteratorScanner for the Iterator.
func NewIteratorScanner(input Iterator, keys []string, defaultValue interface{}) IteratorScanner {
switch input := input.(type) {
case FloatIterator:
return newFloatAuxIterator(input, opt)
return newFloatIteratorScanner(input, keys, defaultValue)
case IntegerIterator:
return newIntegerAuxIterator(input, opt)
return newIntegerIteratorScanner(input, keys, defaultValue)
case UnsignedIterator:
return newUnsignedAuxIterator(input, opt)
return newUnsignedIteratorScanner(input, keys, defaultValue)
case StringIterator:
return newStringAuxIterator(input, opt)
return newStringIteratorScanner(input, keys, defaultValue)
case BooleanIterator:
return newBooleanAuxIterator(input, opt)
return newBooleanIteratorScanner(input, keys, defaultValue)
default:
panic(fmt.Sprintf("unsupported aux iterator type: %T", input))
}
}
// auxIteratorField represents an auxilary field within an AuxIterator.
type auxIteratorField struct {
name string // field name
typ influxql.DataType // detected data type
itrs []Iterator // auxillary iterators
mu sync.Mutex
opt IteratorOptions
}
func (f *auxIteratorField) append(itr Iterator) {
f.mu.Lock()
defer f.mu.Unlock()
f.itrs = append(f.itrs, itr)
}
func (f *auxIteratorField) close() {
f.mu.Lock()
defer f.mu.Unlock()
for _, itr := range f.itrs {
itr.Close()
}
}
type auxIteratorFields struct {
fields []*auxIteratorField
dimensions []string
}
// newAuxIteratorFields returns a new instance of auxIteratorFields from a list of field names.
func newAuxIteratorFields(opt IteratorOptions) *auxIteratorFields {
fields := make([]*auxIteratorField, len(opt.Aux))
for i, ref := range opt.Aux {
fields[i] = &auxIteratorField{name: ref.Val, typ: ref.Type, opt: opt}
}
return &auxIteratorFields{
fields: fields,
dimensions: opt.GetDimensions(),
}
}
func (a *auxIteratorFields) close() {
for _, f := range a.fields {
f.close()
}
}
// iterator creates a new iterator for a named auxilary field.
func (a *auxIteratorFields) iterator(name string, typ influxql.DataType) Iterator {
for _, f := range a.fields {
// Skip field if it's name doesn't match.
// Exit if no points were received by the iterator.
if f.name != name || (typ != influxql.Unknown && f.typ != typ) {
continue
}
// Create channel iterator by data type.
switch f.typ {
case influxql.Float:
itr := &floatChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case influxql.Integer:
itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case influxql.Unsigned:
itr := &unsignedChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case influxql.String, influxql.Tag:
itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case influxql.Boolean:
itr := &booleanChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
default:
}
}
return &nilFloatIterator{}
}
// send sends a point to all field iterators.
func (a *auxIteratorFields) send(p Point) (ok bool) {
values := p.aux()
for i, f := range a.fields {
var v interface{}
if i < len(values) {
v = values[i]
}
tags := p.tags()
tags = tags.Subset(a.dimensions)
// Send new point for each aux iterator.
// Primitive pointers represent nil values.
for _, itr := range f.itrs {
switch itr := itr.(type) {
case *floatChanIterator:
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *integerChanIterator:
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *unsignedChanIterator:
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *stringChanIterator:
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *booleanChanIterator:
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
default:
panic(fmt.Sprintf("invalid aux itr type: %T", itr))
}
}
}
return ok
}
func (a *auxIteratorFields) sendError(err error) {
for _, f := range a.fields {
for _, itr := range f.itrs {
switch itr := itr.(type) {
case *floatChanIterator:
itr.setErr(err)
case *integerChanIterator:
itr.setErr(err)
case *unsignedChanIterator:
itr.setErr(err)
case *stringChanIterator:
itr.setErr(err)
case *booleanChanIterator:
itr.setErr(err)
default:
panic(fmt.Sprintf("invalid aux itr type: %T", itr))
}
}
panic(fmt.Sprintf("unsupported type for iterator scanner: %T", input))
}
}
@ -1231,50 +1098,6 @@ func decodeMeasurement(pb *internal.Measurement) (*influxql.Measurement, error)
return mm, nil
}
// selectInfo represents an object that stores info about select fields.
type selectInfo struct {
calls map[*influxql.Call]struct{}
refs map[*influxql.VarRef]struct{}
}
// newSelectInfo creates a object with call and var ref info from stmt.
func newSelectInfo(stmt *influxql.SelectStatement) *selectInfo {
info := &selectInfo{
calls: make(map[*influxql.Call]struct{}),
refs: make(map[*influxql.VarRef]struct{}),
}
influxql.Walk(info, stmt.Fields)
return info
}
func (v *selectInfo) Visit(n influxql.Node) influxql.Visitor {
switch n := n.(type) {
case *influxql.Call:
v.calls[n] = struct{}{}
return nil
case *influxql.VarRef:
v.refs[n] = struct{}{}
return nil
}
return v
}
// FindSelector returns a selector from the selectInfo. This will only
// return a selector if the Call is a selector and it's the only function
// in the selectInfo.
func (v *selectInfo) FindSelector() *influxql.Call {
if len(v.calls) != 1 {
return nil
}
for s := range v.calls {
if influxql.IsSelector(s) {
return s
}
}
return nil
}
// Interval represents a repeating interval for a query.
type Interval struct {
Duration time.Duration
@ -1332,79 +1155,6 @@ func (itr *nilFloatReaderIterator) Close() error {
}
func (*nilFloatReaderIterator) Next() (*FloatPoint, error) { return nil, nil }
// integerFloatTransformIterator executes a function to modify an existing point for every
// output of the input iterator.
type integerFloatTransformIterator struct {
input IntegerIterator
fn integerFloatTransformFunc
}
// Stats returns stats from the input iterator.
func (itr *integerFloatTransformIterator) Stats() IteratorStats { return itr.input.Stats() }
// Close closes the iterator and all child iterators.
func (itr *integerFloatTransformIterator) Close() error { return itr.input.Close() }
// Next returns the minimum value for the next available interval.
func (itr *integerFloatTransformIterator) Next() (*FloatPoint, error) {
p, err := itr.input.Next()
if err != nil {
return nil, err
} else if p != nil {
return itr.fn(p), nil
}
return nil, nil
}
// integerFloatTransformFunc creates or modifies a point.
// The point passed in may be modified and returned rather than allocating a
// new point if possible.
type integerFloatTransformFunc func(p *IntegerPoint) *FloatPoint
type integerFloatCastIterator struct {
input IntegerIterator
point FloatPoint
}
func (itr *integerFloatCastIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *integerFloatCastIterator) Close() error { return itr.input.Close() }
func (itr *integerFloatCastIterator) Next() (*FloatPoint, error) {
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
itr.point.Name = p.Name
itr.point.Tags = p.Tags
itr.point.Time = p.Time
itr.point.Nil = p.Nil
itr.point.Value = float64(p.Value)
itr.point.Aux = p.Aux
return &itr.point, nil
}
type unsignedFloatCastIterator struct {
input UnsignedIterator
point FloatPoint
}
func (itr *unsignedFloatCastIterator) Stats() IteratorStats { return itr.input.Stats() }
func (itr *unsignedFloatCastIterator) Close() error { return itr.input.Close() }
func (itr *unsignedFloatCastIterator) Next() (*FloatPoint, error) {
p, err := itr.input.Next()
if p == nil || err != nil {
return nil, err
}
itr.point.Name = p.Name
itr.point.Tags = p.Tags
itr.point.Time = p.Time
itr.point.Nil = p.Nil
itr.point.Value = float64(p.Value)
itr.point.Aux = p.Aux
return &itr.point, nil
}
// IteratorStats represents statistics about an iterator.
// Some statistics are available immediately upon iterator creation while
// some are derived as the iterator processes data.

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"reflect"
"strings"
"testing"
@ -770,44 +769,6 @@ func TestLimitIterator_Boolean(t *testing.T) {
}
}
// Ensure auxiliary iterators can be created for auxilary fields.
func TestFloatAuxIterator(t *testing.T) {
itr := query.NewAuxIterator(
&FloatIterator{Points: []query.FloatPoint{
{Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}},
{Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}},
}},
query.IteratorOptions{Aux: []influxql.VarRef{{Val: "f0", Type: influxql.Float}, {Val: "f1", Type: influxql.Float}}},
)
itrs := []query.Iterator{
itr,
itr.Iterator("f0", influxql.Unknown),
itr.Iterator("f1", influxql.Unknown),
itr.Iterator("f0", influxql.Unknown),
}
itr.Start()
if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]query.Point{
{
&query.FloatPoint{Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}},
&query.FloatPoint{Time: 0, Value: float64(100)},
&query.FloatPoint{Time: 0, Value: float64(200)},
&query.FloatPoint{Time: 0, Value: float64(100)},
},
{
&query.FloatPoint{Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}},
&query.FloatPoint{Time: 1, Value: float64(500)},
&query.FloatPoint{Time: 1, Value: math.NaN()},
&query.FloatPoint{Time: 1, Value: float64(500)},
},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}
// Ensure limit iterator returns a subset of points.
func TestLimitIterator(t *testing.T) {
itr := query.NewLimitIterator(

47
query/math.go Normal file
View File

@ -0,0 +1,47 @@
package query
import (
"time"
"github.com/influxdata/influxql"
)
func isMathFunction(call *influxql.Call) bool {
return false
}
type MathTypeMapper struct{}
func (MathTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType {
return influxql.Unknown
}
func (MathTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) {
// TODO(jsternberg): Put math function call types here.
return influxql.Unknown, nil
}
type MathValuer struct {
Valuer influxql.Valuer
}
func (v *MathValuer) Value(key string) (interface{}, bool) {
if v.Valuer != nil {
return v.Valuer.Value(key)
}
return nil, false
}
func (v *MathValuer) Call(name string, args []influxql.Expr) (interface{}, bool) {
if v, ok := v.Valuer.(influxql.CallValuer); ok {
return v.Call(name, args)
}
return nil, false
}
func (v *MathValuer) Zone() *time.Location {
if v, ok := v.Valuer.(influxql.ZoneValuer); ok {
return v.Zone()
}
return nil
}

View File

@ -29,14 +29,14 @@ func MonitorFromContext(ctx context.Context) Monitor {
// PointLimitMonitor is a query monitor that exits when the number of points
// emitted exceeds a threshold.
func PointLimitMonitor(itrs Iterators, interval time.Duration, limit int) MonitorFunc {
func PointLimitMonitor(cur Cursor, interval time.Duration, limit int) MonitorFunc {
return func(closing <-chan struct{}) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := itrs.Stats()
stats := cur.Stats()
if stats.PointN >= limit {
return ErrMaxSelectPointsLimitExceeded(stats.PointN, limit)
}

File diff suppressed because it is too large Load Diff

View File

@ -2631,6 +2631,32 @@ func TestSelect(t *testing.T) {
{Time: 22 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{7.953140268154609}},
},
},
{
name: "DuplicateSelectors",
q: `SELECT min(value) * 2, min(value) / 2 FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`,
typ: influxql.Float,
expr: `min(value::float)`,
itrs: []query.Iterator{
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20},
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3},
{Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100},
}},
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19},
{Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2},
}},
&FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10},
}},
},
rows: []query.Row{
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(38), float64(19) / 2}},
{Time: 10 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(4), float64(1)}},
{Time: 30 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(200), float64(50)}},
{Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(20), float64(5)}},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
shardMapper := ShardMapper{
@ -2866,7 +2892,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_AdditionRHS_Unsigned",
Statement: `SELECT i + 9223372036854775808 FROM cpu`,
Err: `cannot use + with an integer and unsigned`,
Err: `type error: i::integer + 9223372036854775808: cannot use + with an integer and unsigned literal`,
},
{
Name: "Unsigned_AdditionRHS_Unsigned",
@ -2943,7 +2969,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_AdditionLHS_Unsigned",
Statement: `SELECT 9223372036854775808 + i FROM cpu`,
Err: `cannot use + with unsigned and an integer`,
Err: `type error: 9223372036854775808 + i::integer: cannot use + with an integer and unsigned literal`,
},
{
Name: "Unsigned_AdditionLHS_Unsigned",
@ -3002,7 +3028,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_Add_Unsigned",
Statement: `SELECT i + u FROM cpu`,
Err: `cannot use + between an integer and unsigned, an explicit cast is required`,
Err: `type error: i::integer + u::unsigned: cannot use + between an integer and unsigned, an explicit cast is required`,
},
{
Name: "Float_MultiplicationRHS_Number",
@ -3167,7 +3193,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_Multiply_Unsigned",
Statement: `SELECT i * u FROM cpu`,
Err: `cannot use * between an integer and unsigned, an explicit cast is required`,
Err: `type error: i::integer * u::unsigned: cannot use * between an integer and unsigned, an explicit cast is required`,
},
{
Name: "Float_SubtractionRHS_Number",
@ -3235,7 +3261,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_SubtractionRHS_Unsigned",
Statement: `SELECT i - 9223372036854775808 FROM cpu`,
Err: `cannot use - with an integer and unsigned`,
Err: `type error: i::integer - 9223372036854775808: cannot use - with an integer and unsigned literal`,
},
// Skip Unsigned_SubtractionRHS_Integer because it would result in underflow.
{
@ -3304,7 +3330,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_SubtractionLHS_Unsigned",
Statement: `SELECT 9223372036854775808 - i FROM cpu`,
Err: `cannot use - with unsigned and an integer`,
Err: `type error: 9223372036854775808 - i::integer: cannot use - with an integer and unsigned literal`,
},
{
Name: "Unsigned_SubtractionLHS_Unsigned",
@ -3363,7 +3389,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_Subtract_Unsigned",
Statement: `SELECT i - u FROM cpu`,
Err: `cannot use - between an integer and unsigned, an explicit cast is required`,
Err: `type error: i::integer - u::unsigned: cannot use - between an integer and unsigned, an explicit cast is required`,
},
{
Name: "Float_DivisionRHS_Number",
@ -3431,7 +3457,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_DivisionRHS_Unsigned",
Statement: `SELECT i / 9223372036854775808 FROM cpu`,
Err: `cannot use / with an integer and unsigned`,
Err: `type error: i::integer / 9223372036854775808: cannot use / with an integer and unsigned literal`,
},
{
Name: "Unsigned_DivisionRHS_Unsigned",
@ -3508,7 +3534,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_DivisionLHS_Unsigned",
Statement: `SELECT 9223372036854775808 / i FROM cpu`,
Err: `cannot use / with unsigned and an integer`,
Err: `type error: 9223372036854775808 / i::integer: cannot use / with an integer and unsigned literal`,
},
{
Name: "Unsigned_DivisionLHS_Unsigned",
@ -3567,7 +3593,7 @@ func TestSelect_BinaryExpr(t *testing.T) {
{
Name: "Integer_Divide_Unsigned",
Statement: `SELECT i / u FROM cpu`,
Err: `cannot use / between an integer and unsigned, an explicit cast is required`,
Err: `type error: i::integer / u::unsigned: cannot use / between an integer and unsigned, an explicit cast is required`,
},
{
Name: "Integer_BitwiseAndRHS",
@ -3741,9 +3767,9 @@ func TestSelect_BinaryExpr_NilValues(t *testing.T) {
t.Fatalf("unexpected source: %s", m.Name)
}
return &FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20, Aux: []interface{}{float64(20), nil}},
{Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{float64(10), float64(15)}},
{Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{nil, float64(5)}},
{Name: "cpu", Time: 0 * Second, Aux: []interface{}{float64(20), nil}},
{Name: "cpu", Time: 5 * Second, Aux: []interface{}{float64(10), float64(15)}},
{Name: "cpu", Time: 9 * Second, Aux: []interface{}{nil, float64(5)}},
}}, nil
},
}

View File

@ -30,11 +30,10 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti
}
subOpt.Aux = auxFields
itrs, err := buildIterators(ctx, b.stmt, b.ic, subOpt)
cur, err := buildCursor(ctx, b.stmt, b.ic, subOpt)
if err != nil {
return nil, err
}
cur := b.buildCursor(itrs, subOpt.Ascending)
// Construct the iterators for the subquery.
input := NewIteratorMapper(cur, nil, indexes, subOpt)
@ -121,11 +120,10 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq
}
subOpt.Aux = auxFields
itrs, err := buildIterators(ctx, b.stmt, b.ic, subOpt)
cur, err := buildCursor(ctx, b.stmt, b.ic, subOpt)
if err != nil {
return nil, err
}
cur := b.buildCursor(itrs, subOpt.Ascending)
// Construct the iterators for the subquery.
input := NewIteratorMapper(cur, driver, indexes, subOpt)
@ -135,18 +133,3 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq
}
return input, nil
}
func (b *subqueryBuilder) buildCursor(itrs []Iterator, ascending bool) Cursor {
columnNames := b.stmt.ColumnNames()
columns := make([]influxql.VarRef, len(itrs))
for i, itr := range itrs {
columns[i] = influxql.VarRef{
Val: columnNames[i+1],
Type: iteratorDataType(itr),
}
}
cur := newCursor(itrs, columns, ascending)
cur.omitTime = true
return cur
}

View File

@ -5570,13 +5570,12 @@ func TestServer_Query_PercentileDerivative(t *testing.T) {
},
}...)
for i, query := range test.queries {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}
@ -5615,13 +5614,12 @@ func TestServer_Query_UnderscoreMeasurement(t *testing.T) {
},
}...)
for i, query := range test.queries {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}

View File

@ -1221,6 +1221,11 @@ func (a Shards) MapType(measurement, field string) influxql.DataType {
return typ
}
func (a Shards) CallType(name string, args []influxql.DataType) (influxql.DataType, error) {
typmap := query.CallTypeMapper{}
return typmap.CallType(name, args)
}
func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
switch measurement.SystemIterator {
case "_series":