diff --git a/CHANGELOG.md b/CHANGELOG.md index 775d05f93c..7244fca329 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ v1.6.0 [unreleased] ------------------- +### Breaking changes + +- If math is used with the same selector multiple times, it will now act as a selector rather than an aggregate. See [#9563](https://github.com/influxdata/influxdb/pull/9563) for details. + ### Features - [#9429](https://github.com/influxdata/influxdb/pull/9429): Support proxy environment variables in the influx client. diff --git a/query/compile.go b/query/compile.go index 907d9285b3..0e0458d2ff 100644 --- a/query/compile.go +++ b/query/compile.go @@ -95,13 +95,13 @@ func newCompiler(opt CompileOptions) *compiledStatement { func Compile(stmt *influxql.SelectStatement, opt CompileOptions) (Statement, error) { c := newCompiler(opt) - if err := c.preprocess(stmt); err != nil { - return nil, err - } - if err := c.compile(stmt); err != nil { - return nil, err - } c.stmt = stmt.Clone() + if err := c.preprocess(c.stmt); err != nil { + return nil, err + } + if err := c.compile(c.stmt); err != nil { + return nil, err + } c.stmt.TimeAlias = c.TimeFieldName c.stmt.Condition = c.Condition @@ -127,6 +127,10 @@ func (c *compiledStatement) preprocess(stmt *influxql.SelectStatement) error { if err != nil { return err } + // Verify that the condition is actually ok to use. + if err := c.validateCondition(cond); err != nil { + return err + } c.Condition = cond c.TimeRange = t @@ -170,6 +174,7 @@ func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error { for _, source := range stmt.Sources { switch source := source.(type) { case *influxql.SubQuery: + source.Statement.OmitTime = true if err := c.subquery(source.Statement); err != nil { return err } @@ -179,6 +184,8 @@ func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error { } func (c *compiledStatement) compileFields(stmt *influxql.SelectStatement) error { + valuer := MathValuer{} + c.Fields = make([]*compiledField, 0, len(stmt.Fields)) for _, f := range stmt.Fields { // Remove any time selection (it is automatically selected by default) @@ -194,12 +201,10 @@ func (c *compiledStatement) compileFields(stmt *influxql.SelectStatement) error } // Append this field to the list of processed fields and compile it. + f.Expr = influxql.Reduce(f.Expr, &valuer) field := &compiledField{ - global: c, - Field: &influxql.Field{ - Expr: influxql.Reduce(f.Expr, nil), - Alias: f.Alias, - }, + global: c, + Field: f, AllowWildcard: true, } c.Fields = append(c.Fields, field) @@ -244,6 +249,11 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error { c.global.HasAuxiliaryFields = true return nil case *influxql.Call: + if isMathFunction(expr) { + // TODO(jsternberg): Implement validation for any math functions. + return nil + } + // Register the function call in the list of function calls. c.global.FunctionCalls = append(c.global.FunctionCalls, expr) @@ -305,6 +315,8 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error { } case *influxql.ParenExpr: return c.compileExpr(expr.Expr) + case influxql.Literal: + return errors.New("field must contain at least one variable") } return errors.New("unimplemented") } @@ -752,6 +764,35 @@ func (c *compiledStatement) validateFields() error { return nil } +// validateCondition verifies that all elements in the condition are appropriate. +// For example, aggregate calls don't work in the condition and should throw an +// error as an invalid expression. +func (c *compiledStatement) validateCondition(expr influxql.Expr) error { + switch expr := expr.(type) { + case *influxql.BinaryExpr: + // Verify each side of the binary expression. We do not need to + // verify the binary expression itself since that should have been + // done by influxql.ConditionExpr. + if err := c.validateCondition(expr.LHS); err != nil { + return err + } + if err := c.validateCondition(expr.RHS); err != nil { + return err + } + return nil + case *influxql.Call: + if !isMathFunction(expr) { + return fmt.Errorf("invalid function call in condition: %s", expr) + } + if exp, got := 1, len(expr.Args); exp != got { + return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) + } + return c.validateCondition(expr.Args[0]) + default: + return nil + } +} + // subquery compiles and validates a compiled statement for the subquery using // this compiledStatement as the parent. func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error { @@ -763,8 +804,11 @@ func (c *compiledStatement) subquery(stmt *influxql.SelectStatement) error { // Substitute now() into the subquery condition. Then use ConditionExpr to // validate the expression. Do not store the results. We have no way to store // and read those results at the moment. - valuer := influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location} - stmt.Condition = influxql.Reduce(stmt.Condition, &valuer) + valuer := influxql.MultiValuer( + &influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location}, + &MathValuer{}, + ) + stmt.Condition = influxql.Reduce(stmt.Condition, valuer) // If the ordering is different and the sort field was specified for the subquery, // throw an error. @@ -843,6 +887,12 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) return nil, err } + // Validate if the types are correct now that they have been assigned. + if err := validateTypes(stmt); err != nil { + shards.Close() + return nil, err + } + // Determine base options for iterators. opt, err := newIteratorOptionsStmt(stmt, sopt) if err != nil { diff --git a/query/cursor.go b/query/cursor.go index d400190660..f39cfd826a 100644 --- a/query/cursor.go +++ b/query/cursor.go @@ -1,7 +1,6 @@ package query import ( - "fmt" "time" "github.com/influxdata/influxql" @@ -73,6 +72,9 @@ type Cursor interface { // the previous values will be overwritten while using the same memory. Scan(row *Row) bool + // Stats returns the IteratorStats from the underlying iterators. + Stats() IteratorStats + // Err returns any errors that were encountered from scanning the rows. Err() error @@ -83,188 +85,6 @@ type Cursor interface { Close() error } -type cursor struct { - buf []Point - itrs []Iterator - ascending bool - - series Series - err error - - columns []influxql.VarRef - omitTime bool - loc *time.Location -} - -func newCursor(itrs []Iterator, columns []influxql.VarRef, ascending bool) *cursor { - return &cursor{ - buf: make([]Point, len(itrs)), - itrs: itrs, - ascending: ascending, - columns: columns, - loc: time.UTC, - } -} - -func (cur *cursor) Scan(row *Row) bool { - // Immediately end the scan if there are no iterators. - if len(cur.itrs) == 0 { - return false - } - - t, name, tags, err := cur.loadBuf() - if err != nil { - cur.err = err - return false - } - - if t == ZeroTime { - return false - } - row.Time = t - - // Check to see if the name or tags have changed. - // If they have, copy them over and update the series id. - if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() { - cur.series.id++ - cur.series.Name = name - cur.series.Tags = tags - } - row.Series = cur.series - - // If the row values is not large enough, allocate a slice. - if len(row.Values) < len(cur.columns) { - row.Values = make([]interface{}, len(cur.columns)) - } - if !cur.omitTime { - row.Values[0] = time.Unix(0, t).In(cur.loc) - cur.readInto(t, name, tags, row.Values[1:]) - } else { - cur.readInto(t, name, tags, row.Values) - } - return true -} - -// loadBuf reads in points into empty buffer slots. -// Returns the next time/name/tags to emit for. -func (cur *cursor) loadBuf() (t int64, name string, tags Tags, err error) { - t = ZeroTime - - for i := range cur.itrs { - // Load buffer, if empty. - if cur.buf[i] == nil { - cur.buf[i], err = cur.readIterator(cur.itrs[i]) - if err != nil { - break - } - } - - // Skip if buffer is empty. - p := cur.buf[i] - if p == nil { - continue - } - itrTime, itrName, itrTags := p.time(), p.name(), p.tags() - // Initialize range values if not set. - if t == ZeroTime { - t, name, tags = itrTime, itrName, itrTags - continue - } - - // Update range values if lower and emitter is in time ascending order. - if cur.ascending { - if (itrName < name) || (itrName == name && itrTags.ID() < tags.ID()) || (itrName == name && itrTags.ID() == tags.ID() && itrTime < t) { - t, name, tags = itrTime, itrName, itrTags - } - continue - } - - // Update range values if higher and emitter is in time descending order. - if (itrName > name) || (itrName == name && itrTags.ID() > tags.ID()) || (itrName == name && itrTags.ID() == tags.ID() && itrTime > t) { - t, name, tags = itrTime, itrName, itrTags - } - } - return -} - -func (cur *cursor) readInto(t int64, name string, tags Tags, values []interface{}) { - for i, p := range cur.buf { - // Skip if buffer is empty. - if p == nil { - values[i] = nil - continue - } - - // Skip point if it doesn't match time/name/tags. - pTags := p.tags() - if p.time() != t || p.name() != name || !pTags.Equals(&tags) { - values[i] = nil - continue - } - - // Read point value. - values[i] = p.value() - - // Clear buffer. - cur.buf[i] = nil - } -} - -// readIterator reads the next point from itr. -func (cur *cursor) readIterator(itr Iterator) (Point, error) { - if itr == nil { - return nil, nil - } - - switch itr := itr.(type) { - case FloatIterator: - if p, err := itr.Next(); err != nil { - return nil, err - } else if p != nil { - return p, nil - } - case IntegerIterator: - if p, err := itr.Next(); err != nil { - return nil, err - } else if p != nil { - return p, nil - } - case UnsignedIterator: - if p, err := itr.Next(); err != nil { - return nil, err - } else if p != nil { - return p, nil - } - case StringIterator: - if p, err := itr.Next(); err != nil { - return nil, err - } else if p != nil { - return p, nil - } - case BooleanIterator: - if p, err := itr.Next(); err != nil { - return nil, err - } else if p != nil { - return p, nil - } - default: - panic(fmt.Sprintf("unsupported iterator: %T", itr)) - } - return nil, nil -} - -func (cur *cursor) Err() error { - return cur.err -} - -func (cur *cursor) Columns() []influxql.VarRef { - return cur.columns -} - -func (cur *cursor) Close() error { - return Iterators(cur.itrs).Close() -} - // RowCursor returns a Cursor that iterates over Rows. func RowCursor(rows []Row, columns []influxql.VarRef) Cursor { return &rowCursor{ @@ -295,6 +115,10 @@ func (cur *rowCursor) Scan(row *Row) bool { return true } +func (cur *rowCursor) Stats() IteratorStats { + return IteratorStats{} +} + func (cur *rowCursor) Err() error { return nil } @@ -307,6 +131,195 @@ func (cur *rowCursor) Close() error { return nil } +type scannerFunc func(m map[string]interface{}) (int64, string, Tags) + +type scannerCursorBase struct { + fields []influxql.Expr + m map[string]interface{} + + series Series + columns []influxql.VarRef + loc *time.Location + + scan scannerFunc +} + +func newScannerCursorBase(scan scannerFunc, fields []*influxql.Field, loc *time.Location) scannerCursorBase { + typmap := FunctionTypeMapper{} + exprs := make([]influxql.Expr, len(fields)) + columns := make([]influxql.VarRef, len(fields)) + for i, f := range fields { + exprs[i] = f.Expr + columns[i] = influxql.VarRef{ + Val: f.Name(), + Type: influxql.EvalType(f.Expr, nil, typmap), + } + } + if loc == nil { + loc = time.UTC + } + + return scannerCursorBase{ + fields: exprs, + m: make(map[string]interface{}), + columns: columns, + loc: loc, + scan: scan, + } +} + +func (cur *scannerCursorBase) Scan(row *Row) bool { + ts, name, tags := cur.scan(cur.m) + if ts == ZeroTime { + return false + } + + row.Time = ts + if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() { + cur.series.Name = name + cur.series.Tags = tags + cur.series.id++ + } + row.Series = cur.series + + if len(cur.columns) > len(row.Values) { + row.Values = make([]interface{}, len(cur.columns)) + } + + valuer := influxql.ValuerEval{ + Valuer: &MathValuer{ + Valuer: influxql.MapValuer(cur.m), + }, + IntegerFloatDivision: true, + } + for i, expr := range cur.fields { + // A special case if the field is time to reduce memory allocations. + if ref, ok := expr.(*influxql.VarRef); ok && ref.Val == "time" { + row.Values[i] = time.Unix(0, row.Time).In(cur.loc) + continue + } + row.Values[i] = valuer.Eval(expr) + } + return true +} + +func (cur *scannerCursorBase) Columns() []influxql.VarRef { + return cur.columns +} + +var _ Cursor = (*scannerCursor)(nil) + +type scannerCursor struct { + scanner IteratorScanner + scannerCursorBase +} + +func newScannerCursor(s IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *scannerCursor { + cur := &scannerCursor{scanner: s} + cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location) + return cur +} + +func (s *scannerCursor) scan(m map[string]interface{}) (int64, string, Tags) { + ts, name, tags := s.scanner.Peek() + if ts == ZeroTime { + return ts, name, tags + } + s.scanner.ScanAt(ts, name, tags, m) + return ts, name, tags +} + +func (cur *scannerCursor) Stats() IteratorStats { + return cur.scanner.Stats() +} + +func (cur *scannerCursor) Err() error { + return cur.scanner.Err() +} + +func (cur *scannerCursor) Close() error { + return cur.scanner.Close() +} + +var _ Cursor = (*multiScannerCursor)(nil) + +type multiScannerCursor struct { + scanners []IteratorScanner + err error + ascending bool + scannerCursorBase +} + +func newMultiScannerCursor(scanners []IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *multiScannerCursor { + cur := &multiScannerCursor{ + scanners: scanners, + ascending: opt.Ascending, + } + cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location) + return cur +} + +func (cur *multiScannerCursor) scan(m map[string]interface{}) (ts int64, name string, tags Tags) { + ts = ZeroTime + for _, s := range cur.scanners { + curTime, curName, curTags := s.Peek() + if curTime == ZeroTime { + if err := s.Err(); err != nil { + cur.err = err + return ZeroTime, "", Tags{} + } + continue + } + + if ts == ZeroTime { + ts, name, tags = curTime, curName, curTags + continue + } + + if cur.ascending { + if (curName < name) || (curName == name && curTags.ID() < tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime < ts) { + ts, name, tags = curTime, curName, curTags + } + continue + } + + if (curName > name) || (curName == name && curTags.ID() > tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime > ts) { + ts, name, tags = curTime, curName, curTags + } + } + + if ts == ZeroTime { + return ts, name, tags + } + + for _, s := range cur.scanners { + s.ScanAt(ts, name, tags, m) + } + return ts, name, tags +} + +func (cur *multiScannerCursor) Stats() IteratorStats { + var stats IteratorStats + for _, s := range cur.scanners { + stats.Add(s.Stats()) + } + return stats +} + +func (cur *multiScannerCursor) Err() error { + return cur.err +} + +func (cur *multiScannerCursor) Close() error { + var err error + for _, s := range cur.scanners { + if e := s.Close(); e != nil && err == nil { + err = e + } + } + return err +} + // DrainCursor will read and discard all values from a Cursor and return the error // if one happens. func DrainCursor(cur Cursor) error { diff --git a/query/executor.go b/query/executor.go index b80b45ec1f..04c56c5b16 100644 --- a/query/executor.go +++ b/query/executor.go @@ -152,14 +152,6 @@ func NewContextWithIterators(ctx context.Context, itr *Iterators) context.Contex return context.WithValue(ctx, iteratorsContextKey, itr) } -// tryAddAuxIteratorToContext will capture itr in the *Iterators slice, when configured -// with a call to NewContextWithIterators. -func tryAddAuxIteratorToContext(ctx context.Context, itr AuxIterator) { - if v, ok := ctx.Value(iteratorsContextKey).(*Iterators); ok { - *v = append(*v, itr) - } -} - // StatementExecutor executes a statement within the Executor. type StatementExecutor interface { // ExecuteStatement executes a statement. Results should be sent to the diff --git a/query/functions.go b/query/functions.go index 0c69279ea4..b3d859e891 100644 --- a/query/functions.go +++ b/query/functions.go @@ -31,20 +31,49 @@ func (m FieldMapper) CallType(name string, args []influxql.DataType) (influxql.D return typmap.CallType(name, args) } -type FunctionTypeMapper struct{} +// CallTypeMapper returns the types for call iterator functions. +// Call iterator functions are commonly implemented within the storage engine +// so this mapper is limited to only the return values of those functions. +type CallTypeMapper struct{} + +func (CallTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType { + return influxql.Unknown +} + +func (CallTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { + // If the function is not implemented by the embedded field mapper, then + // see if we implement the function and return the type here. + switch name { + case "mean": + return influxql.Float, nil + case "count": + return influxql.Integer, nil + case "min", "max", "sum", "first", "last": + // TODO(jsternberg): Verify the input type. + return args[0], nil + } + return influxql.Unknown, nil +} + +// FunctionTypeMapper handles the type mapping for all functions implemented by the +// query engine. +type FunctionTypeMapper struct { + CallTypeMapper +} func (FunctionTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType { return influxql.Unknown } -func (FunctionTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { - // If the function is not implemented by the embedded field mapper, then - // see if we implement the function and return the type here. +func (m FunctionTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { + if typ, err := m.CallTypeMapper.CallType(name, args); typ != influxql.Unknown || err != nil { + return typ, err + } + + // Handle functions implemented by the query engine. switch name { - case "mean", "median", "integral", "stddev": + case "median", "integral", "stddev": return influxql.Float, nil - case "count": - return influxql.Integer, nil case "elapsed": return influxql.Integer, nil default: diff --git a/query/iterator.gen.go b/query/iterator.gen.go index 160b16b91e..07b1d4a837 100644 --- a/query/iterator.gen.go +++ b/query/iterator.gen.go @@ -491,6 +491,95 @@ type floatSortedMergeHeapItem struct { itr FloatIterator } +// floatIteratorScanner scans the results of a FloatIterator into a map. +type floatIteratorScanner struct { + input *bufFloatIterator + err error + keys []string + defaultValue interface{} +} + +// newFloatIteratorScanner creates a new IteratorScanner. +func newFloatIteratorScanner(input FloatIterator, keys []string, defaultValue interface{}) *floatIteratorScanner { + return &floatIteratorScanner{ + input: newBufFloatIterator(input), + keys: keys, + defaultValue: defaultValue, + } +} + +func (s *floatIteratorScanner) Peek() (int64, string, Tags) { + if s.err != nil { + return ZeroTime, "", Tags{} + } + + p, err := s.input.peek() + if err != nil { + s.err = err + return ZeroTime, "", Tags{} + } else if p == nil { + return ZeroTime, "", Tags{} + } + return p.Time, p.Name, p.Tags +} + +func (s *floatIteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { + if s.err != nil { + return + } + + p, err := s.input.Next() + if err != nil { + s.err = err + return + } else if p == nil { + s.useDefaults(m) + return + } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { + s.useDefaults(m) + s.input.unread(p) + return + } + + if k := s.keys[0]; k != "" { + if p.Nil { + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } else { + m[k] = p.Value + } + } + for i, v := range p.Aux { + k := s.keys[i+1] + switch v.(type) { + case float64, int64, uint64, string, bool: + m[k] = v + default: + // Insert the fill value if one was specified. + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } + } +} + +func (s *floatIteratorScanner) useDefaults(m map[string]interface{}) { + if s.defaultValue == SkipDefault { + return + } + for _, k := range s.keys { + if k == "" { + continue + } + m[k] = s.defaultValue + } +} + +func (s *floatIteratorScanner) Stats() IteratorStats { return s.input.Stats() } +func (s *floatIteratorScanner) Err() error { return s.err } +func (s *floatIteratorScanner) Close() error { return s.input.Close() } + // floatParallelIterator represents an iterator that pulls data in a separate goroutine. type floatParallelIterator struct { input FloatIterator @@ -904,172 +993,6 @@ func (itr *floatCloseInterruptIterator) Next() (*FloatPoint, error) { return p, nil } -// auxFloatPoint represents a combination of a point and an error for the AuxIterator. -type auxFloatPoint struct { - point *FloatPoint - err error -} - -// floatAuxIterator represents a float implementation of AuxIterator. -type floatAuxIterator struct { - input *bufFloatIterator - output chan auxFloatPoint - fields *auxIteratorFields - background bool - closer sync.Once -} - -func newFloatAuxIterator(input FloatIterator, opt IteratorOptions) *floatAuxIterator { - return &floatAuxIterator{ - input: newBufFloatIterator(input), - output: make(chan auxFloatPoint, 1), - fields: newAuxIteratorFields(opt), - } -} - -func (itr *floatAuxIterator) Background() { - itr.background = true - itr.Start() - go DrainIterator(itr) -} - -func (itr *floatAuxIterator) Start() { go itr.stream() } -func (itr *floatAuxIterator) Stats() IteratorStats { return itr.input.Stats() } - -func (itr *floatAuxIterator) Close() error { - var err error - itr.closer.Do(func() { err = itr.input.Close() }) - return err -} - -func (itr *floatAuxIterator) Next() (*FloatPoint, error) { - p := <-itr.output - return p.point, p.err -} -func (itr *floatAuxIterator) Iterator(name string, typ influxql.DataType) Iterator { - return itr.fields.iterator(name, typ) -} - -func (itr *floatAuxIterator) stream() { - for { - // Read next point. - p, err := itr.input.Next() - if err != nil { - itr.output <- auxFloatPoint{err: err} - itr.fields.sendError(err) - break - } else if p == nil { - break - } - - // Send point to output and to each field iterator. - itr.output <- auxFloatPoint{point: p} - if ok := itr.fields.send(p); !ok && itr.background { - break - } - } - - close(itr.output) - itr.fields.close() -} - -// floatChanIterator represents a new instance of floatChanIterator. -type floatChanIterator struct { - buf struct { - i int - filled bool - points [2]FloatPoint - } - err error - cond *sync.Cond - done bool -} - -func (itr *floatChanIterator) Stats() IteratorStats { return IteratorStats{} } - -func (itr *floatChanIterator) Close() error { - itr.cond.L.Lock() - // Mark the channel iterator as done and signal all waiting goroutines to start again. - itr.done = true - itr.cond.Broadcast() - // Do not defer the unlock so we don't create an unnecessary allocation. - itr.cond.L.Unlock() - return nil -} - -func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Wait for either the iterator to be done (so we don't have to set the value) - // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf.filled { - itr.cond.Wait() - } - - // Do not set the value and return false to signal that the iterator is closed. - // Do this after the above wait as the above for loop may have exited because - // the iterator was closed. - if itr.done { - return false - } - - switch v := value.(type) { - case float64: - itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Value: v} - - case int64: - itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)} - - default: - itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Nil: true} - } - itr.buf.filled = true - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() - return true -} - -func (itr *floatChanIterator) setErr(err error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - itr.err = err - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() -} - -func (itr *floatChanIterator) Next() (*FloatPoint, error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Check for an error and return one if there. - if itr.err != nil { - return nil, itr.err - } - - // Wait until either a value is available in the buffer or - // the iterator is closed. - for !itr.done && !itr.buf.filled { - itr.cond.Wait() - } - - // Return nil once the channel is done and the buffer is empty. - if itr.done && !itr.buf.filled { - return nil, nil - } - - // Always read from the buffer if it exists, even if the iterator - // is closed. This prevents the last value from being truncated by - // the parent iterator. - p := &itr.buf.points[itr.buf.i] - itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) - itr.buf.filled = false - itr.cond.Signal() - return p, nil -} - // floatReduceFloatIterator executes a reducer for every interval and buffers the result. type floatReduceFloatIterator struct { input *bufFloatIterator @@ -3280,13 +3203,7 @@ func (itr *floatIteratorMapper) Next() (*FloatPoint, error) { } func (itr *floatIteratorMapper) Stats() IteratorStats { - stats := IteratorStats{} - if cur, ok := itr.cur.(*cursor); ok { - for _, itr := range cur.itrs { - stats.Add(itr.Stats()) - } - } - return stats + return itr.cur.Stats() } func (itr *floatIteratorMapper) Close() error { @@ -3904,6 +3821,95 @@ type integerSortedMergeHeapItem struct { itr IntegerIterator } +// integerIteratorScanner scans the results of a IntegerIterator into a map. +type integerIteratorScanner struct { + input *bufIntegerIterator + err error + keys []string + defaultValue interface{} +} + +// newIntegerIteratorScanner creates a new IteratorScanner. +func newIntegerIteratorScanner(input IntegerIterator, keys []string, defaultValue interface{}) *integerIteratorScanner { + return &integerIteratorScanner{ + input: newBufIntegerIterator(input), + keys: keys, + defaultValue: defaultValue, + } +} + +func (s *integerIteratorScanner) Peek() (int64, string, Tags) { + if s.err != nil { + return ZeroTime, "", Tags{} + } + + p, err := s.input.peek() + if err != nil { + s.err = err + return ZeroTime, "", Tags{} + } else if p == nil { + return ZeroTime, "", Tags{} + } + return p.Time, p.Name, p.Tags +} + +func (s *integerIteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { + if s.err != nil { + return + } + + p, err := s.input.Next() + if err != nil { + s.err = err + return + } else if p == nil { + s.useDefaults(m) + return + } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { + s.useDefaults(m) + s.input.unread(p) + return + } + + if k := s.keys[0]; k != "" { + if p.Nil { + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } else { + m[k] = p.Value + } + } + for i, v := range p.Aux { + k := s.keys[i+1] + switch v.(type) { + case float64, int64, uint64, string, bool: + m[k] = v + default: + // Insert the fill value if one was specified. + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } + } +} + +func (s *integerIteratorScanner) useDefaults(m map[string]interface{}) { + if s.defaultValue == SkipDefault { + return + } + for _, k := range s.keys { + if k == "" { + continue + } + m[k] = s.defaultValue + } +} + +func (s *integerIteratorScanner) Stats() IteratorStats { return s.input.Stats() } +func (s *integerIteratorScanner) Err() error { return s.err } +func (s *integerIteratorScanner) Close() error { return s.input.Close() } + // integerParallelIterator represents an iterator that pulls data in a separate goroutine. type integerParallelIterator struct { input IntegerIterator @@ -4317,169 +4323,6 @@ func (itr *integerCloseInterruptIterator) Next() (*IntegerPoint, error) { return p, nil } -// auxIntegerPoint represents a combination of a point and an error for the AuxIterator. -type auxIntegerPoint struct { - point *IntegerPoint - err error -} - -// integerAuxIterator represents a integer implementation of AuxIterator. -type integerAuxIterator struct { - input *bufIntegerIterator - output chan auxIntegerPoint - fields *auxIteratorFields - background bool - closer sync.Once -} - -func newIntegerAuxIterator(input IntegerIterator, opt IteratorOptions) *integerAuxIterator { - return &integerAuxIterator{ - input: newBufIntegerIterator(input), - output: make(chan auxIntegerPoint, 1), - fields: newAuxIteratorFields(opt), - } -} - -func (itr *integerAuxIterator) Background() { - itr.background = true - itr.Start() - go DrainIterator(itr) -} - -func (itr *integerAuxIterator) Start() { go itr.stream() } -func (itr *integerAuxIterator) Stats() IteratorStats { return itr.input.Stats() } - -func (itr *integerAuxIterator) Close() error { - var err error - itr.closer.Do(func() { err = itr.input.Close() }) - return err -} - -func (itr *integerAuxIterator) Next() (*IntegerPoint, error) { - p := <-itr.output - return p.point, p.err -} -func (itr *integerAuxIterator) Iterator(name string, typ influxql.DataType) Iterator { - return itr.fields.iterator(name, typ) -} - -func (itr *integerAuxIterator) stream() { - for { - // Read next point. - p, err := itr.input.Next() - if err != nil { - itr.output <- auxIntegerPoint{err: err} - itr.fields.sendError(err) - break - } else if p == nil { - break - } - - // Send point to output and to each field iterator. - itr.output <- auxIntegerPoint{point: p} - if ok := itr.fields.send(p); !ok && itr.background { - break - } - } - - close(itr.output) - itr.fields.close() -} - -// integerChanIterator represents a new instance of integerChanIterator. -type integerChanIterator struct { - buf struct { - i int - filled bool - points [2]IntegerPoint - } - err error - cond *sync.Cond - done bool -} - -func (itr *integerChanIterator) Stats() IteratorStats { return IteratorStats{} } - -func (itr *integerChanIterator) Close() error { - itr.cond.L.Lock() - // Mark the channel iterator as done and signal all waiting goroutines to start again. - itr.done = true - itr.cond.Broadcast() - // Do not defer the unlock so we don't create an unnecessary allocation. - itr.cond.L.Unlock() - return nil -} - -func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Wait for either the iterator to be done (so we don't have to set the value) - // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf.filled { - itr.cond.Wait() - } - - // Do not set the value and return false to signal that the iterator is closed. - // Do this after the above wait as the above for loop may have exited because - // the iterator was closed. - if itr.done { - return false - } - - switch v := value.(type) { - case int64: - itr.buf.points[itr.buf.i] = IntegerPoint{Name: name, Tags: tags, Time: time, Value: v} - - default: - itr.buf.points[itr.buf.i] = IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true} - } - itr.buf.filled = true - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() - return true -} - -func (itr *integerChanIterator) setErr(err error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - itr.err = err - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() -} - -func (itr *integerChanIterator) Next() (*IntegerPoint, error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Check for an error and return one if there. - if itr.err != nil { - return nil, itr.err - } - - // Wait until either a value is available in the buffer or - // the iterator is closed. - for !itr.done && !itr.buf.filled { - itr.cond.Wait() - } - - // Return nil once the channel is done and the buffer is empty. - if itr.done && !itr.buf.filled { - return nil, nil - } - - // Always read from the buffer if it exists, even if the iterator - // is closed. This prevents the last value from being truncated by - // the parent iterator. - p := &itr.buf.points[itr.buf.i] - itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) - itr.buf.filled = false - itr.cond.Signal() - return p, nil -} - // integerReduceFloatIterator executes a reducer for every interval and buffers the result. type integerReduceFloatIterator struct { input *bufIntegerIterator @@ -6690,13 +6533,7 @@ func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) { } func (itr *integerIteratorMapper) Stats() IteratorStats { - stats := IteratorStats{} - if cur, ok := itr.cur.(*cursor); ok { - for _, itr := range cur.itrs { - stats.Add(itr.Stats()) - } - } - return stats + return itr.cur.Stats() } func (itr *integerIteratorMapper) Close() error { @@ -7314,6 +7151,95 @@ type unsignedSortedMergeHeapItem struct { itr UnsignedIterator } +// unsignedIteratorScanner scans the results of a UnsignedIterator into a map. +type unsignedIteratorScanner struct { + input *bufUnsignedIterator + err error + keys []string + defaultValue interface{} +} + +// newUnsignedIteratorScanner creates a new IteratorScanner. +func newUnsignedIteratorScanner(input UnsignedIterator, keys []string, defaultValue interface{}) *unsignedIteratorScanner { + return &unsignedIteratorScanner{ + input: newBufUnsignedIterator(input), + keys: keys, + defaultValue: defaultValue, + } +} + +func (s *unsignedIteratorScanner) Peek() (int64, string, Tags) { + if s.err != nil { + return ZeroTime, "", Tags{} + } + + p, err := s.input.peek() + if err != nil { + s.err = err + return ZeroTime, "", Tags{} + } else if p == nil { + return ZeroTime, "", Tags{} + } + return p.Time, p.Name, p.Tags +} + +func (s *unsignedIteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { + if s.err != nil { + return + } + + p, err := s.input.Next() + if err != nil { + s.err = err + return + } else if p == nil { + s.useDefaults(m) + return + } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { + s.useDefaults(m) + s.input.unread(p) + return + } + + if k := s.keys[0]; k != "" { + if p.Nil { + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } else { + m[k] = p.Value + } + } + for i, v := range p.Aux { + k := s.keys[i+1] + switch v.(type) { + case float64, int64, uint64, string, bool: + m[k] = v + default: + // Insert the fill value if one was specified. + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } + } +} + +func (s *unsignedIteratorScanner) useDefaults(m map[string]interface{}) { + if s.defaultValue == SkipDefault { + return + } + for _, k := range s.keys { + if k == "" { + continue + } + m[k] = s.defaultValue + } +} + +func (s *unsignedIteratorScanner) Stats() IteratorStats { return s.input.Stats() } +func (s *unsignedIteratorScanner) Err() error { return s.err } +func (s *unsignedIteratorScanner) Close() error { return s.input.Close() } + // unsignedParallelIterator represents an iterator that pulls data in a separate goroutine. type unsignedParallelIterator struct { input UnsignedIterator @@ -7727,169 +7653,6 @@ func (itr *unsignedCloseInterruptIterator) Next() (*UnsignedPoint, error) { return p, nil } -// auxUnsignedPoint represents a combination of a point and an error for the AuxIterator. -type auxUnsignedPoint struct { - point *UnsignedPoint - err error -} - -// unsignedAuxIterator represents a unsigned implementation of AuxIterator. -type unsignedAuxIterator struct { - input *bufUnsignedIterator - output chan auxUnsignedPoint - fields *auxIteratorFields - background bool - closer sync.Once -} - -func newUnsignedAuxIterator(input UnsignedIterator, opt IteratorOptions) *unsignedAuxIterator { - return &unsignedAuxIterator{ - input: newBufUnsignedIterator(input), - output: make(chan auxUnsignedPoint, 1), - fields: newAuxIteratorFields(opt), - } -} - -func (itr *unsignedAuxIterator) Background() { - itr.background = true - itr.Start() - go DrainIterator(itr) -} - -func (itr *unsignedAuxIterator) Start() { go itr.stream() } -func (itr *unsignedAuxIterator) Stats() IteratorStats { return itr.input.Stats() } - -func (itr *unsignedAuxIterator) Close() error { - var err error - itr.closer.Do(func() { err = itr.input.Close() }) - return err -} - -func (itr *unsignedAuxIterator) Next() (*UnsignedPoint, error) { - p := <-itr.output - return p.point, p.err -} -func (itr *unsignedAuxIterator) Iterator(name string, typ influxql.DataType) Iterator { - return itr.fields.iterator(name, typ) -} - -func (itr *unsignedAuxIterator) stream() { - for { - // Read next point. - p, err := itr.input.Next() - if err != nil { - itr.output <- auxUnsignedPoint{err: err} - itr.fields.sendError(err) - break - } else if p == nil { - break - } - - // Send point to output and to each field iterator. - itr.output <- auxUnsignedPoint{point: p} - if ok := itr.fields.send(p); !ok && itr.background { - break - } - } - - close(itr.output) - itr.fields.close() -} - -// unsignedChanIterator represents a new instance of unsignedChanIterator. -type unsignedChanIterator struct { - buf struct { - i int - filled bool - points [2]UnsignedPoint - } - err error - cond *sync.Cond - done bool -} - -func (itr *unsignedChanIterator) Stats() IteratorStats { return IteratorStats{} } - -func (itr *unsignedChanIterator) Close() error { - itr.cond.L.Lock() - // Mark the channel iterator as done and signal all waiting goroutines to start again. - itr.done = true - itr.cond.Broadcast() - // Do not defer the unlock so we don't create an unnecessary allocation. - itr.cond.L.Unlock() - return nil -} - -func (itr *unsignedChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Wait for either the iterator to be done (so we don't have to set the value) - // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf.filled { - itr.cond.Wait() - } - - // Do not set the value and return false to signal that the iterator is closed. - // Do this after the above wait as the above for loop may have exited because - // the iterator was closed. - if itr.done { - return false - } - - switch v := value.(type) { - case uint64: - itr.buf.points[itr.buf.i] = UnsignedPoint{Name: name, Tags: tags, Time: time, Value: v} - - default: - itr.buf.points[itr.buf.i] = UnsignedPoint{Name: name, Tags: tags, Time: time, Nil: true} - } - itr.buf.filled = true - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() - return true -} - -func (itr *unsignedChanIterator) setErr(err error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - itr.err = err - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() -} - -func (itr *unsignedChanIterator) Next() (*UnsignedPoint, error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Check for an error and return one if there. - if itr.err != nil { - return nil, itr.err - } - - // Wait until either a value is available in the buffer or - // the iterator is closed. - for !itr.done && !itr.buf.filled { - itr.cond.Wait() - } - - // Return nil once the channel is done and the buffer is empty. - if itr.done && !itr.buf.filled { - return nil, nil - } - - // Always read from the buffer if it exists, even if the iterator - // is closed. This prevents the last value from being truncated by - // the parent iterator. - p := &itr.buf.points[itr.buf.i] - itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) - itr.buf.filled = false - itr.cond.Signal() - return p, nil -} - // unsignedReduceFloatIterator executes a reducer for every interval and buffers the result. type unsignedReduceFloatIterator struct { input *bufUnsignedIterator @@ -10100,13 +9863,7 @@ func (itr *unsignedIteratorMapper) Next() (*UnsignedPoint, error) { } func (itr *unsignedIteratorMapper) Stats() IteratorStats { - stats := IteratorStats{} - if cur, ok := itr.cur.(*cursor); ok { - for _, itr := range cur.itrs { - stats.Add(itr.Stats()) - } - } - return stats + return itr.cur.Stats() } func (itr *unsignedIteratorMapper) Close() error { @@ -10724,6 +10481,95 @@ type stringSortedMergeHeapItem struct { itr StringIterator } +// stringIteratorScanner scans the results of a StringIterator into a map. +type stringIteratorScanner struct { + input *bufStringIterator + err error + keys []string + defaultValue interface{} +} + +// newStringIteratorScanner creates a new IteratorScanner. +func newStringIteratorScanner(input StringIterator, keys []string, defaultValue interface{}) *stringIteratorScanner { + return &stringIteratorScanner{ + input: newBufStringIterator(input), + keys: keys, + defaultValue: defaultValue, + } +} + +func (s *stringIteratorScanner) Peek() (int64, string, Tags) { + if s.err != nil { + return ZeroTime, "", Tags{} + } + + p, err := s.input.peek() + if err != nil { + s.err = err + return ZeroTime, "", Tags{} + } else if p == nil { + return ZeroTime, "", Tags{} + } + return p.Time, p.Name, p.Tags +} + +func (s *stringIteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { + if s.err != nil { + return + } + + p, err := s.input.Next() + if err != nil { + s.err = err + return + } else if p == nil { + s.useDefaults(m) + return + } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { + s.useDefaults(m) + s.input.unread(p) + return + } + + if k := s.keys[0]; k != "" { + if p.Nil { + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } else { + m[k] = p.Value + } + } + for i, v := range p.Aux { + k := s.keys[i+1] + switch v.(type) { + case float64, int64, uint64, string, bool: + m[k] = v + default: + // Insert the fill value if one was specified. + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } + } +} + +func (s *stringIteratorScanner) useDefaults(m map[string]interface{}) { + if s.defaultValue == SkipDefault { + return + } + for _, k := range s.keys { + if k == "" { + continue + } + m[k] = s.defaultValue + } +} + +func (s *stringIteratorScanner) Stats() IteratorStats { return s.input.Stats() } +func (s *stringIteratorScanner) Err() error { return s.err } +func (s *stringIteratorScanner) Close() error { return s.input.Close() } + // stringParallelIterator represents an iterator that pulls data in a separate goroutine. type stringParallelIterator struct { input StringIterator @@ -11123,169 +10969,6 @@ func (itr *stringCloseInterruptIterator) Next() (*StringPoint, error) { return p, nil } -// auxStringPoint represents a combination of a point and an error for the AuxIterator. -type auxStringPoint struct { - point *StringPoint - err error -} - -// stringAuxIterator represents a string implementation of AuxIterator. -type stringAuxIterator struct { - input *bufStringIterator - output chan auxStringPoint - fields *auxIteratorFields - background bool - closer sync.Once -} - -func newStringAuxIterator(input StringIterator, opt IteratorOptions) *stringAuxIterator { - return &stringAuxIterator{ - input: newBufStringIterator(input), - output: make(chan auxStringPoint, 1), - fields: newAuxIteratorFields(opt), - } -} - -func (itr *stringAuxIterator) Background() { - itr.background = true - itr.Start() - go DrainIterator(itr) -} - -func (itr *stringAuxIterator) Start() { go itr.stream() } -func (itr *stringAuxIterator) Stats() IteratorStats { return itr.input.Stats() } - -func (itr *stringAuxIterator) Close() error { - var err error - itr.closer.Do(func() { err = itr.input.Close() }) - return err -} - -func (itr *stringAuxIterator) Next() (*StringPoint, error) { - p := <-itr.output - return p.point, p.err -} -func (itr *stringAuxIterator) Iterator(name string, typ influxql.DataType) Iterator { - return itr.fields.iterator(name, typ) -} - -func (itr *stringAuxIterator) stream() { - for { - // Read next point. - p, err := itr.input.Next() - if err != nil { - itr.output <- auxStringPoint{err: err} - itr.fields.sendError(err) - break - } else if p == nil { - break - } - - // Send point to output and to each field iterator. - itr.output <- auxStringPoint{point: p} - if ok := itr.fields.send(p); !ok && itr.background { - break - } - } - - close(itr.output) - itr.fields.close() -} - -// stringChanIterator represents a new instance of stringChanIterator. -type stringChanIterator struct { - buf struct { - i int - filled bool - points [2]StringPoint - } - err error - cond *sync.Cond - done bool -} - -func (itr *stringChanIterator) Stats() IteratorStats { return IteratorStats{} } - -func (itr *stringChanIterator) Close() error { - itr.cond.L.Lock() - // Mark the channel iterator as done and signal all waiting goroutines to start again. - itr.done = true - itr.cond.Broadcast() - // Do not defer the unlock so we don't create an unnecessary allocation. - itr.cond.L.Unlock() - return nil -} - -func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Wait for either the iterator to be done (so we don't have to set the value) - // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf.filled { - itr.cond.Wait() - } - - // Do not set the value and return false to signal that the iterator is closed. - // Do this after the above wait as the above for loop may have exited because - // the iterator was closed. - if itr.done { - return false - } - - switch v := value.(type) { - case string: - itr.buf.points[itr.buf.i] = StringPoint{Name: name, Tags: tags, Time: time, Value: v} - - default: - itr.buf.points[itr.buf.i] = StringPoint{Name: name, Tags: tags, Time: time, Nil: true} - } - itr.buf.filled = true - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() - return true -} - -func (itr *stringChanIterator) setErr(err error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - itr.err = err - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() -} - -func (itr *stringChanIterator) Next() (*StringPoint, error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Check for an error and return one if there. - if itr.err != nil { - return nil, itr.err - } - - // Wait until either a value is available in the buffer or - // the iterator is closed. - for !itr.done && !itr.buf.filled { - itr.cond.Wait() - } - - // Return nil once the channel is done and the buffer is empty. - if itr.done && !itr.buf.filled { - return nil, nil - } - - // Always read from the buffer if it exists, even if the iterator - // is closed. This prevents the last value from being truncated by - // the parent iterator. - p := &itr.buf.points[itr.buf.i] - itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) - itr.buf.filled = false - itr.cond.Signal() - return p, nil -} - // stringReduceFloatIterator executes a reducer for every interval and buffers the result. type stringReduceFloatIterator struct { input *bufStringIterator @@ -13496,13 +13179,7 @@ func (itr *stringIteratorMapper) Next() (*StringPoint, error) { } func (itr *stringIteratorMapper) Stats() IteratorStats { - stats := IteratorStats{} - if cur, ok := itr.cur.(*cursor); ok { - for _, itr := range cur.itrs { - stats.Add(itr.Stats()) - } - } - return stats + return itr.cur.Stats() } func (itr *stringIteratorMapper) Close() error { @@ -14120,6 +13797,95 @@ type booleanSortedMergeHeapItem struct { itr BooleanIterator } +// booleanIteratorScanner scans the results of a BooleanIterator into a map. +type booleanIteratorScanner struct { + input *bufBooleanIterator + err error + keys []string + defaultValue interface{} +} + +// newBooleanIteratorScanner creates a new IteratorScanner. +func newBooleanIteratorScanner(input BooleanIterator, keys []string, defaultValue interface{}) *booleanIteratorScanner { + return &booleanIteratorScanner{ + input: newBufBooleanIterator(input), + keys: keys, + defaultValue: defaultValue, + } +} + +func (s *booleanIteratorScanner) Peek() (int64, string, Tags) { + if s.err != nil { + return ZeroTime, "", Tags{} + } + + p, err := s.input.peek() + if err != nil { + s.err = err + return ZeroTime, "", Tags{} + } else if p == nil { + return ZeroTime, "", Tags{} + } + return p.Time, p.Name, p.Tags +} + +func (s *booleanIteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { + if s.err != nil { + return + } + + p, err := s.input.Next() + if err != nil { + s.err = err + return + } else if p == nil { + s.useDefaults(m) + return + } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { + s.useDefaults(m) + s.input.unread(p) + return + } + + if k := s.keys[0]; k != "" { + if p.Nil { + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } else { + m[k] = p.Value + } + } + for i, v := range p.Aux { + k := s.keys[i+1] + switch v.(type) { + case float64, int64, uint64, string, bool: + m[k] = v + default: + // Insert the fill value if one was specified. + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } + } +} + +func (s *booleanIteratorScanner) useDefaults(m map[string]interface{}) { + if s.defaultValue == SkipDefault { + return + } + for _, k := range s.keys { + if k == "" { + continue + } + m[k] = s.defaultValue + } +} + +func (s *booleanIteratorScanner) Stats() IteratorStats { return s.input.Stats() } +func (s *booleanIteratorScanner) Err() error { return s.err } +func (s *booleanIteratorScanner) Close() error { return s.input.Close() } + // booleanParallelIterator represents an iterator that pulls data in a separate goroutine. type booleanParallelIterator struct { input BooleanIterator @@ -14519,169 +14285,6 @@ func (itr *booleanCloseInterruptIterator) Next() (*BooleanPoint, error) { return p, nil } -// auxBooleanPoint represents a combination of a point and an error for the AuxIterator. -type auxBooleanPoint struct { - point *BooleanPoint - err error -} - -// booleanAuxIterator represents a boolean implementation of AuxIterator. -type booleanAuxIterator struct { - input *bufBooleanIterator - output chan auxBooleanPoint - fields *auxIteratorFields - background bool - closer sync.Once -} - -func newBooleanAuxIterator(input BooleanIterator, opt IteratorOptions) *booleanAuxIterator { - return &booleanAuxIterator{ - input: newBufBooleanIterator(input), - output: make(chan auxBooleanPoint, 1), - fields: newAuxIteratorFields(opt), - } -} - -func (itr *booleanAuxIterator) Background() { - itr.background = true - itr.Start() - go DrainIterator(itr) -} - -func (itr *booleanAuxIterator) Start() { go itr.stream() } -func (itr *booleanAuxIterator) Stats() IteratorStats { return itr.input.Stats() } - -func (itr *booleanAuxIterator) Close() error { - var err error - itr.closer.Do(func() { err = itr.input.Close() }) - return err -} - -func (itr *booleanAuxIterator) Next() (*BooleanPoint, error) { - p := <-itr.output - return p.point, p.err -} -func (itr *booleanAuxIterator) Iterator(name string, typ influxql.DataType) Iterator { - return itr.fields.iterator(name, typ) -} - -func (itr *booleanAuxIterator) stream() { - for { - // Read next point. - p, err := itr.input.Next() - if err != nil { - itr.output <- auxBooleanPoint{err: err} - itr.fields.sendError(err) - break - } else if p == nil { - break - } - - // Send point to output and to each field iterator. - itr.output <- auxBooleanPoint{point: p} - if ok := itr.fields.send(p); !ok && itr.background { - break - } - } - - close(itr.output) - itr.fields.close() -} - -// booleanChanIterator represents a new instance of booleanChanIterator. -type booleanChanIterator struct { - buf struct { - i int - filled bool - points [2]BooleanPoint - } - err error - cond *sync.Cond - done bool -} - -func (itr *booleanChanIterator) Stats() IteratorStats { return IteratorStats{} } - -func (itr *booleanChanIterator) Close() error { - itr.cond.L.Lock() - // Mark the channel iterator as done and signal all waiting goroutines to start again. - itr.done = true - itr.cond.Broadcast() - // Do not defer the unlock so we don't create an unnecessary allocation. - itr.cond.L.Unlock() - return nil -} - -func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Wait for either the iterator to be done (so we don't have to set the value) - // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf.filled { - itr.cond.Wait() - } - - // Do not set the value and return false to signal that the iterator is closed. - // Do this after the above wait as the above for loop may have exited because - // the iterator was closed. - if itr.done { - return false - } - - switch v := value.(type) { - case bool: - itr.buf.points[itr.buf.i] = BooleanPoint{Name: name, Tags: tags, Time: time, Value: v} - - default: - itr.buf.points[itr.buf.i] = BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true} - } - itr.buf.filled = true - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() - return true -} - -func (itr *booleanChanIterator) setErr(err error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - itr.err = err - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() -} - -func (itr *booleanChanIterator) Next() (*BooleanPoint, error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Check for an error and return one if there. - if itr.err != nil { - return nil, itr.err - } - - // Wait until either a value is available in the buffer or - // the iterator is closed. - for !itr.done && !itr.buf.filled { - itr.cond.Wait() - } - - // Return nil once the channel is done and the buffer is empty. - if itr.done && !itr.buf.filled { - return nil, nil - } - - // Always read from the buffer if it exists, even if the iterator - // is closed. This prevents the last value from being truncated by - // the parent iterator. - p := &itr.buf.points[itr.buf.i] - itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) - itr.buf.filled = false - itr.cond.Signal() - return p, nil -} - // booleanReduceFloatIterator executes a reducer for every interval and buffers the result. type booleanReduceFloatIterator struct { input *bufBooleanIterator @@ -16892,13 +16495,7 @@ func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) { } func (itr *booleanIteratorMapper) Stats() IteratorStats { - stats := IteratorStats{} - if cur, ok := itr.cur.(*cursor); ok { - for _, itr := range cur.itrs { - stats.Add(itr.Stats()) - } - } - return stats + return itr.cur.Stats() } func (itr *booleanIteratorMapper) Close() error { diff --git a/query/iterator.gen.go.tmpl b/query/iterator.gen.go.tmpl index 02ea0657bc..0e78ca2562 100644 --- a/query/iterator.gen.go.tmpl +++ b/query/iterator.gen.go.tmpl @@ -490,6 +490,95 @@ type {{$k.name}}SortedMergeHeapItem struct { itr {{$k.Name}}Iterator } +// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map. +type {{$k.name}}IteratorScanner struct { + input *buf{{$k.Name}}Iterator + err error + keys []string + defaultValue interface{} +} + +// new{{$k.Name}}IteratorScanner creates a new IteratorScanner. +func new{{$k.Name}}IteratorScanner(input {{$k.Name}}Iterator, keys []string, defaultValue interface{}) *{{$k.name}}IteratorScanner { + return &{{$k.name}}IteratorScanner{ + input: newBuf{{$k.Name}}Iterator(input), + keys: keys, + defaultValue: defaultValue, + } +} + +func (s *{{$k.name}}IteratorScanner) Peek() (int64, string, Tags) { + if s.err != nil { + return ZeroTime, "", Tags{} + } + + p, err := s.input.peek() + if err != nil { + s.err = err + return ZeroTime, "", Tags{} + } else if p == nil { + return ZeroTime, "", Tags{} + } + return p.Time, p.Name, p.Tags +} + +func (s *{{$k.name}}IteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) { + if s.err != nil { + return + } + + p, err := s.input.Next() + if err != nil { + s.err = err + return + } else if p == nil { + s.useDefaults(m) + return + } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) { + s.useDefaults(m) + s.input.unread(p) + return + } + + if k := s.keys[0]; k != "" { + if p.Nil { + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } else { + m[k] = p.Value + } + } + for i, v := range p.Aux { + k := s.keys[i+1] + switch v.(type) { + case float64, int64, uint64, string, bool: + m[k] = v + default: + // Insert the fill value if one was specified. + if s.defaultValue != SkipDefault { + m[k] = s.defaultValue + } + } + } +} + +func (s *{{$k.name}}IteratorScanner) useDefaults(m map[string]interface{}) { + if s.defaultValue == SkipDefault { + return + } + for _, k := range s.keys { + if k == "" { + continue + } + m[k] = s.defaultValue + } +} + +func (s *{{$k.name}}IteratorScanner) Stats() IteratorStats { return s.input.Stats() } +func (s *{{$k.name}}IteratorScanner) Err() error { return s.err } +func (s *{{$k.name}}IteratorScanner) Close() error { return s.input.Close() } + // {{$k.name}}ParallelIterator represents an iterator that pulls data in a separate goroutine. type {{$k.name}}ParallelIterator struct { input {{$k.Name}}Iterator @@ -906,170 +995,6 @@ func (itr *{{$k.name}}CloseInterruptIterator) Next() (*{{$k.Name}}Point, error) return p, nil } -// aux{{$k.Name}}Point represents a combination of a point and an error for the AuxIterator. -type aux{{$k.Name}}Point struct { - point *{{$k.Name}}Point - err error -} - -// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator. -type {{$k.name}}AuxIterator struct { - input *buf{{$k.Name}}Iterator - output chan aux{{$k.Name}}Point - fields *auxIteratorFields - background bool - closer sync.Once -} - -func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, opt IteratorOptions) *{{$k.name}}AuxIterator { - return &{{$k.name}}AuxIterator{ - input: newBuf{{$k.Name}}Iterator(input), - output: make(chan aux{{$k.Name}}Point, 1), - fields: newAuxIteratorFields(opt), - } -} - -func (itr *{{$k.name}}AuxIterator) Background() { - itr.background = true - itr.Start() - go DrainIterator(itr) -} - -func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() } -func (itr *{{$k.name}}AuxIterator) Stats() IteratorStats { return itr.input.Stats() } - -func (itr *{{$k.name}}AuxIterator) Close() error { - var err error - itr.closer.Do(func() { err = itr.input.Close() }) - return err -} - -func (itr *{{$k.name}}AuxIterator) Next() (*{{$k.Name}}Point, error) { - p := <-itr.output - return p.point, p.err -} -func (itr *{{$k.name}}AuxIterator) Iterator(name string, typ influxql.DataType) Iterator { return itr.fields.iterator(name, typ) } - -func (itr *{{.name}}AuxIterator) stream() { - for { - // Read next point. - p, err := itr.input.Next() - if err != nil { - itr.output <- aux{{$k.Name}}Point{err: err} - itr.fields.sendError(err) - break - } else if p == nil { - break - } - - // Send point to output and to each field iterator. - itr.output <- aux{{$k.Name}}Point{point: p} - if ok := itr.fields.send(p); !ok && itr.background { - break - } - } - - close(itr.output) - itr.fields.close() -} - -// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator. -type {{$k.name}}ChanIterator struct { - buf struct { - i int - filled bool - points [2]{{$k.Name}}Point - } - err error - cond *sync.Cond - done bool -} - -func (itr *{{$k.name}}ChanIterator) Stats() IteratorStats { return IteratorStats{} } - -func (itr *{{$k.name}}ChanIterator) Close() error { - itr.cond.L.Lock() - // Mark the channel iterator as done and signal all waiting goroutines to start again. - itr.done = true - itr.cond.Broadcast() - // Do not defer the unlock so we don't create an unnecessary allocation. - itr.cond.L.Unlock() - return nil -} - -func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Wait for either the iterator to be done (so we don't have to set the value) - // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf.filled { - itr.cond.Wait() - } - - // Do not set the value and return false to signal that the iterator is closed. - // Do this after the above wait as the above for loop may have exited because - // the iterator was closed. - if itr.done { - return false - } - - switch v := value.(type) { - case {{$k.Type}}: - itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v} -{{if eq $k.Name "Float"}} - case int64: - itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)} -{{end}} - default: - itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true} - } - itr.buf.filled = true - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() - return true -} - -func (itr *{{$k.name}}ChanIterator) setErr(err error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - itr.err = err - - // Signal to all waiting goroutines that a new value is ready to read. - itr.cond.Signal() -} - -func (itr *{{$k.name}}ChanIterator) Next() (*{{$k.Name}}Point, error) { - itr.cond.L.Lock() - defer itr.cond.L.Unlock() - - // Check for an error and return one if there. - if itr.err != nil { - return nil, itr.err - } - - // Wait until either a value is available in the buffer or - // the iterator is closed. - for !itr.done && !itr.buf.filled { - itr.cond.Wait() - } - - // Return nil once the channel is done and the buffer is empty. - if itr.done && !itr.buf.filled { - return nil, nil - } - - // Always read from the buffer if it exists, even if the iterator - // is closed. This prevents the last value from being truncated by - // the parent iterator. - p := &itr.buf.points[itr.buf.i] - itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) - itr.buf.filled = false - itr.cond.Signal() - return p, nil -} - {{range $v := $types}} // {{$k.name}}Reduce{{$v.Name}}Iterator executes a reducer for every interval and buffers the result. @@ -1620,13 +1545,7 @@ func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) { } func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats { - stats := IteratorStats{} - if cur, ok := itr.cur.(*cursor); ok { - for _, itr := range cur.itrs { - stats.Add(itr.Stats()) - } - } - return stats + return itr.cur.Stats() } func (itr *{{$k.name}}IteratorMapper) Close() error { diff --git a/query/iterator.go b/query/iterator.go index 8e22963012..2602fe3d4f 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "regexp" - "sync" "time" "github.com/gogo/protobuf/proto" @@ -404,175 +403,43 @@ func NewCloseInterruptIterator(input Iterator, closing <-chan struct{}) Iterator } } -// AuxIterator represents an iterator that can split off separate auxiliary iterators. -type AuxIterator interface { - Iterator +// IteratorScanner is used to scan the results of an iterator into a map. +type IteratorScanner interface { + // Peek retrieves information about the next point. It returns a timestamp, the name, and the tags. + Peek() (int64, string, Tags) - // Auxilary iterator - Iterator(name string, typ influxql.DataType) Iterator + // ScanAt will take a time, name, and tags and scan the point that matches those into the map. + ScanAt(ts int64, name string, tags Tags, values map[string]interface{}) - // Start starts writing to the created iterators. - Start() + // Stats returns the IteratorStats from the Iterator. + Stats() IteratorStats - // Backgrounds the iterator so that, when start is called, it will - // continuously read from the iterator. - Background() + // Err returns an error that was encountered while scanning. + Err() error + + io.Closer } -// NewAuxIterator returns a new instance of AuxIterator. -func NewAuxIterator(input Iterator, opt IteratorOptions) AuxIterator { +// SkipDefault is a sentinel value to tell the IteratorScanner to skip setting the +// default value if none was present. This causes the map to use the previous value +// if it was previously set. +var SkipDefault = interface{}(0) + +// NewIteratorScanner produces an IteratorScanner for the Iterator. +func NewIteratorScanner(input Iterator, keys []string, defaultValue interface{}) IteratorScanner { switch input := input.(type) { case FloatIterator: - return newFloatAuxIterator(input, opt) + return newFloatIteratorScanner(input, keys, defaultValue) case IntegerIterator: - return newIntegerAuxIterator(input, opt) + return newIntegerIteratorScanner(input, keys, defaultValue) case UnsignedIterator: - return newUnsignedAuxIterator(input, opt) + return newUnsignedIteratorScanner(input, keys, defaultValue) case StringIterator: - return newStringAuxIterator(input, opt) + return newStringIteratorScanner(input, keys, defaultValue) case BooleanIterator: - return newBooleanAuxIterator(input, opt) + return newBooleanIteratorScanner(input, keys, defaultValue) default: - panic(fmt.Sprintf("unsupported aux iterator type: %T", input)) - } -} - -// auxIteratorField represents an auxilary field within an AuxIterator. -type auxIteratorField struct { - name string // field name - typ influxql.DataType // detected data type - itrs []Iterator // auxillary iterators - mu sync.Mutex - opt IteratorOptions -} - -func (f *auxIteratorField) append(itr Iterator) { - f.mu.Lock() - defer f.mu.Unlock() - f.itrs = append(f.itrs, itr) -} - -func (f *auxIteratorField) close() { - f.mu.Lock() - defer f.mu.Unlock() - for _, itr := range f.itrs { - itr.Close() - } -} - -type auxIteratorFields struct { - fields []*auxIteratorField - dimensions []string -} - -// newAuxIteratorFields returns a new instance of auxIteratorFields from a list of field names. -func newAuxIteratorFields(opt IteratorOptions) *auxIteratorFields { - fields := make([]*auxIteratorField, len(opt.Aux)) - for i, ref := range opt.Aux { - fields[i] = &auxIteratorField{name: ref.Val, typ: ref.Type, opt: opt} - } - return &auxIteratorFields{ - fields: fields, - dimensions: opt.GetDimensions(), - } -} - -func (a *auxIteratorFields) close() { - for _, f := range a.fields { - f.close() - } -} - -// iterator creates a new iterator for a named auxilary field. -func (a *auxIteratorFields) iterator(name string, typ influxql.DataType) Iterator { - for _, f := range a.fields { - // Skip field if it's name doesn't match. - // Exit if no points were received by the iterator. - if f.name != name || (typ != influxql.Unknown && f.typ != typ) { - continue - } - - // Create channel iterator by data type. - switch f.typ { - case influxql.Float: - itr := &floatChanIterator{cond: sync.NewCond(&sync.Mutex{})} - f.append(itr) - return itr - case influxql.Integer: - itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})} - f.append(itr) - return itr - case influxql.Unsigned: - itr := &unsignedChanIterator{cond: sync.NewCond(&sync.Mutex{})} - f.append(itr) - return itr - case influxql.String, influxql.Tag: - itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})} - f.append(itr) - return itr - case influxql.Boolean: - itr := &booleanChanIterator{cond: sync.NewCond(&sync.Mutex{})} - f.append(itr) - return itr - default: - } - } - - return &nilFloatIterator{} -} - -// send sends a point to all field iterators. -func (a *auxIteratorFields) send(p Point) (ok bool) { - values := p.aux() - for i, f := range a.fields { - var v interface{} - if i < len(values) { - v = values[i] - } - - tags := p.tags() - tags = tags.Subset(a.dimensions) - - // Send new point for each aux iterator. - // Primitive pointers represent nil values. - for _, itr := range f.itrs { - switch itr := itr.(type) { - case *floatChanIterator: - ok = itr.setBuf(p.name(), tags, p.time(), v) || ok - case *integerChanIterator: - ok = itr.setBuf(p.name(), tags, p.time(), v) || ok - case *unsignedChanIterator: - ok = itr.setBuf(p.name(), tags, p.time(), v) || ok - case *stringChanIterator: - ok = itr.setBuf(p.name(), tags, p.time(), v) || ok - case *booleanChanIterator: - ok = itr.setBuf(p.name(), tags, p.time(), v) || ok - default: - panic(fmt.Sprintf("invalid aux itr type: %T", itr)) - } - } - } - return ok -} - -func (a *auxIteratorFields) sendError(err error) { - for _, f := range a.fields { - for _, itr := range f.itrs { - switch itr := itr.(type) { - case *floatChanIterator: - itr.setErr(err) - case *integerChanIterator: - itr.setErr(err) - case *unsignedChanIterator: - itr.setErr(err) - case *stringChanIterator: - itr.setErr(err) - case *booleanChanIterator: - itr.setErr(err) - default: - panic(fmt.Sprintf("invalid aux itr type: %T", itr)) - } - } + panic(fmt.Sprintf("unsupported type for iterator scanner: %T", input)) } } @@ -1231,50 +1098,6 @@ func decodeMeasurement(pb *internal.Measurement) (*influxql.Measurement, error) return mm, nil } -// selectInfo represents an object that stores info about select fields. -type selectInfo struct { - calls map[*influxql.Call]struct{} - refs map[*influxql.VarRef]struct{} -} - -// newSelectInfo creates a object with call and var ref info from stmt. -func newSelectInfo(stmt *influxql.SelectStatement) *selectInfo { - info := &selectInfo{ - calls: make(map[*influxql.Call]struct{}), - refs: make(map[*influxql.VarRef]struct{}), - } - influxql.Walk(info, stmt.Fields) - return info -} - -func (v *selectInfo) Visit(n influxql.Node) influxql.Visitor { - switch n := n.(type) { - case *influxql.Call: - v.calls[n] = struct{}{} - return nil - case *influxql.VarRef: - v.refs[n] = struct{}{} - return nil - } - return v -} - -// FindSelector returns a selector from the selectInfo. This will only -// return a selector if the Call is a selector and it's the only function -// in the selectInfo. -func (v *selectInfo) FindSelector() *influxql.Call { - if len(v.calls) != 1 { - return nil - } - - for s := range v.calls { - if influxql.IsSelector(s) { - return s - } - } - return nil -} - // Interval represents a repeating interval for a query. type Interval struct { Duration time.Duration @@ -1332,79 +1155,6 @@ func (itr *nilFloatReaderIterator) Close() error { } func (*nilFloatReaderIterator) Next() (*FloatPoint, error) { return nil, nil } -// integerFloatTransformIterator executes a function to modify an existing point for every -// output of the input iterator. -type integerFloatTransformIterator struct { - input IntegerIterator - fn integerFloatTransformFunc -} - -// Stats returns stats from the input iterator. -func (itr *integerFloatTransformIterator) Stats() IteratorStats { return itr.input.Stats() } - -// Close closes the iterator and all child iterators. -func (itr *integerFloatTransformIterator) Close() error { return itr.input.Close() } - -// Next returns the minimum value for the next available interval. -func (itr *integerFloatTransformIterator) Next() (*FloatPoint, error) { - p, err := itr.input.Next() - if err != nil { - return nil, err - } else if p != nil { - return itr.fn(p), nil - } - return nil, nil -} - -// integerFloatTransformFunc creates or modifies a point. -// The point passed in may be modified and returned rather than allocating a -// new point if possible. -type integerFloatTransformFunc func(p *IntegerPoint) *FloatPoint - -type integerFloatCastIterator struct { - input IntegerIterator - point FloatPoint -} - -func (itr *integerFloatCastIterator) Stats() IteratorStats { return itr.input.Stats() } -func (itr *integerFloatCastIterator) Close() error { return itr.input.Close() } -func (itr *integerFloatCastIterator) Next() (*FloatPoint, error) { - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - itr.point.Name = p.Name - itr.point.Tags = p.Tags - itr.point.Time = p.Time - itr.point.Nil = p.Nil - itr.point.Value = float64(p.Value) - itr.point.Aux = p.Aux - return &itr.point, nil -} - -type unsignedFloatCastIterator struct { - input UnsignedIterator - point FloatPoint -} - -func (itr *unsignedFloatCastIterator) Stats() IteratorStats { return itr.input.Stats() } -func (itr *unsignedFloatCastIterator) Close() error { return itr.input.Close() } -func (itr *unsignedFloatCastIterator) Next() (*FloatPoint, error) { - p, err := itr.input.Next() - if p == nil || err != nil { - return nil, err - } - - itr.point.Name = p.Name - itr.point.Tags = p.Tags - itr.point.Time = p.Time - itr.point.Nil = p.Nil - itr.point.Value = float64(p.Value) - itr.point.Aux = p.Aux - return &itr.point, nil -} - // IteratorStats represents statistics about an iterator. // Some statistics are available immediately upon iterator creation while // some are derived as the iterator processes data. diff --git a/query/iterator_test.go b/query/iterator_test.go index 59ef8e3874..42663e0944 100644 --- a/query/iterator_test.go +++ b/query/iterator_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "math" "reflect" "strings" "testing" @@ -770,44 +769,6 @@ func TestLimitIterator_Boolean(t *testing.T) { } } -// Ensure auxiliary iterators can be created for auxilary fields. -func TestFloatAuxIterator(t *testing.T) { - itr := query.NewAuxIterator( - &FloatIterator{Points: []query.FloatPoint{ - {Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}}, - {Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}}, - }}, - query.IteratorOptions{Aux: []influxql.VarRef{{Val: "f0", Type: influxql.Float}, {Val: "f1", Type: influxql.Float}}}, - ) - - itrs := []query.Iterator{ - itr, - itr.Iterator("f0", influxql.Unknown), - itr.Iterator("f1", influxql.Unknown), - itr.Iterator("f0", influxql.Unknown), - } - itr.Start() - - if a, err := Iterators(itrs).ReadAll(); err != nil { - t.Fatalf("unexpected error: %s", err) - } else if !deep.Equal(a, [][]query.Point{ - { - &query.FloatPoint{Time: 0, Value: 1, Aux: []interface{}{float64(100), float64(200)}}, - &query.FloatPoint{Time: 0, Value: float64(100)}, - &query.FloatPoint{Time: 0, Value: float64(200)}, - &query.FloatPoint{Time: 0, Value: float64(100)}, - }, - { - &query.FloatPoint{Time: 1, Value: 2, Aux: []interface{}{float64(500), math.NaN()}}, - &query.FloatPoint{Time: 1, Value: float64(500)}, - &query.FloatPoint{Time: 1, Value: math.NaN()}, - &query.FloatPoint{Time: 1, Value: float64(500)}, - }, - }) { - t.Fatalf("unexpected points: %s", spew.Sdump(a)) - } -} - // Ensure limit iterator returns a subset of points. func TestLimitIterator(t *testing.T) { itr := query.NewLimitIterator( diff --git a/query/math.go b/query/math.go new file mode 100644 index 0000000000..1de89517f8 --- /dev/null +++ b/query/math.go @@ -0,0 +1,47 @@ +package query + +import ( + "time" + + "github.com/influxdata/influxql" +) + +func isMathFunction(call *influxql.Call) bool { + return false +} + +type MathTypeMapper struct{} + +func (MathTypeMapper) MapType(measurement *influxql.Measurement, field string) influxql.DataType { + return influxql.Unknown +} + +func (MathTypeMapper) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { + // TODO(jsternberg): Put math function call types here. + return influxql.Unknown, nil +} + +type MathValuer struct { + Valuer influxql.Valuer +} + +func (v *MathValuer) Value(key string) (interface{}, bool) { + if v.Valuer != nil { + return v.Valuer.Value(key) + } + return nil, false +} + +func (v *MathValuer) Call(name string, args []influxql.Expr) (interface{}, bool) { + if v, ok := v.Valuer.(influxql.CallValuer); ok { + return v.Call(name, args) + } + return nil, false +} + +func (v *MathValuer) Zone() *time.Location { + if v, ok := v.Valuer.(influxql.ZoneValuer); ok { + return v.Zone() + } + return nil +} diff --git a/query/monitor.go b/query/monitor.go index c3c254eea8..8e76be751d 100644 --- a/query/monitor.go +++ b/query/monitor.go @@ -29,14 +29,14 @@ func MonitorFromContext(ctx context.Context) Monitor { // PointLimitMonitor is a query monitor that exits when the number of points // emitted exceeds a threshold. -func PointLimitMonitor(itrs Iterators, interval time.Duration, limit int) MonitorFunc { +func PointLimitMonitor(cur Cursor, interval time.Duration, limit int) MonitorFunc { return func(closing <-chan struct{}) error { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - stats := itrs.Stats() + stats := cur.Stats() if stats.PointN >= limit { return ErrMaxSelectPointsLimitExceeded(stats.PointN, limit) } diff --git a/query/select.go b/query/select.go index dae99d6687..8e7be29e9f 100644 --- a/query/select.go +++ b/query/select.go @@ -2,14 +2,11 @@ package query import ( "context" - "errors" "fmt" "io" - "math" "sort" "time" - "github.com/influxdata/influxdb/pkg/tracing" "github.com/influxdata/influxql" ) @@ -115,58 +112,16 @@ func (p *preparedStatement) Select(ctx context.Context) (Cursor, error) { opt := p.opt opt.InterruptCh = ctx.Done() - itrs, err := buildIterators(ctx, p.stmt, p.ic, opt) + cur, err := buildCursor(ctx, p.stmt, p.ic, opt) if err != nil { return nil, err } - columns := make([]influxql.VarRef, len(p.columns)) - offset := 0 - if !p.stmt.OmitTime { - columns[0] = influxql.VarRef{ - Val: p.columns[0], - Type: influxql.Time, - } - offset++ - } - - valuer := influxql.TypeValuerEval{ - TypeMapper: DefaultTypeMapper, - } - for i, f := range p.stmt.Fields { - typ, _ := valuer.EvalType(f.Expr) - columns[i+offset] = influxql.VarRef{ - Val: p.columns[i+offset], - Type: typ, - } - - if p.stmt.Target != nil { - continue - } - - if call, ok := f.Expr.(*influxql.Call); ok && (call.Name == "top" || call.Name == "bottom") { - for j := 1; j < len(call.Args)-1; j++ { - offset++ - typ, _ := valuer.EvalType(call.Args[j]) - columns[i+offset] = influxql.VarRef{ - Val: p.columns[i+offset], - Type: typ, - } - } - } - } - - cur := newCursor(itrs, columns, p.opt.Ascending) - cur.omitTime = p.stmt.OmitTime - if p.stmt.Location != nil { - cur.loc = p.stmt.Location - } - // If a monitor exists and we are told there is a maximum number of points, // register the monitor function. if m := MonitorFromContext(ctx); m != nil { if p.maxPointN > 0 { - monitor := PointLimitMonitor(itrs, DefaultStatsInterval, p.maxPointN) + monitor := PointLimitMonitor(cur, DefaultStatsInterval, p.maxPointN) m.Monitor(monitor) } } @@ -177,300 +132,6 @@ func (p *preparedStatement) Close() error { return p.ic.Close() } -func buildIterators(ctx context.Context, stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) ([]Iterator, error) { - span := tracing.SpanFromContext(ctx) - // Retrieve refs for each call and var ref. - info := newSelectInfo(stmt) - if len(info.calls) > 1 && len(info.refs) > 0 { - return nil, errors.New("cannot select fields when selecting multiple aggregates") - } - - // Determine auxiliary fields to be selected. - opt.Aux = make([]influxql.VarRef, 0, len(info.refs)) - for ref := range info.refs { - opt.Aux = append(opt.Aux, *ref) - } - sort.Sort(influxql.VarRefs(opt.Aux)) - - // If there are multiple auxilary fields and no calls then construct an aux iterator. - if len(info.calls) == 0 && len(info.refs) > 0 { - if span != nil { - span = span.StartSpan("auxiliary_iterators") - defer span.Finish() - - span.SetLabels("statement", stmt.String()) - ctx = tracing.NewContextWithSpan(ctx, span) - } - return buildAuxIterators(ctx, stmt.Fields, ic, stmt.Sources, opt) - } - - if span != nil { - span = span.StartSpan("field_iterators") - defer span.Finish() - - span.SetLabels("statement", stmt.String()) - ctx = tracing.NewContextWithSpan(ctx, span) - } - - // Include auxiliary fields from top() and bottom() when not writing the results. - fields := stmt.Fields - if stmt.Target == nil { - extraFields := 0 - for call := range info.calls { - if call.Name == "top" || call.Name == "bottom" { - for i := 1; i < len(call.Args)-1; i++ { - ref := call.Args[i].(*influxql.VarRef) - opt.Aux = append(opt.Aux, *ref) - extraFields++ - } - } - } - - if extraFields > 0 { - // Rebuild the list of fields if any extra fields are being implicitly added - fields = make([]*influxql.Field, 0, len(stmt.Fields)+extraFields) - for _, f := range stmt.Fields { - fields = append(fields, f) - switch expr := f.Expr.(type) { - case *influxql.Call: - if expr.Name == "top" || expr.Name == "bottom" { - for i := 1; i < len(expr.Args)-1; i++ { - fields = append(fields, &influxql.Field{Expr: expr.Args[i]}) - } - } - } - } - } - } - - // Determine if there is one call and it is a selector. - selector := false - if len(info.calls) == 1 { - for call := range info.calls { - selector = influxql.IsSelector(call) - } - } - - return buildFieldIterators(ctx, fields, ic, stmt.Sources, opt, selector, stmt.Target != nil) -} - -// buildAuxIterators creates a set of iterators from a single combined auxiliary iterator. -func buildAuxIterators(ctx context.Context, fields influxql.Fields, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions) ([]Iterator, error) { - // Create the auxiliary iterators for each source. - inputs := make([]Iterator, 0, len(sources)) - if err := func() error { - for _, source := range sources { - switch source := source.(type) { - case *influxql.Measurement: - input, err := ic.CreateIterator(ctx, source, opt) - if err != nil { - return err - } - inputs = append(inputs, input) - case *influxql.SubQuery: - b := subqueryBuilder{ - ic: ic, - stmt: source.Statement, - } - - input, err := b.buildAuxIterator(ctx, opt) - if err != nil { - return err - } - inputs = append(inputs, input) - } - } - return nil - }(); err != nil { - Iterators(inputs).Close() - return nil, err - } - - // Merge iterators to read auxilary fields. - input, err := Iterators(inputs).Merge(opt) - if err != nil { - Iterators(inputs).Close() - return nil, err - } else if input == nil { - input = &nilFloatIterator{} - } - - // Filter out duplicate rows, if required. - if opt.Dedupe { - // If there is no group by and it is a float iterator, see if we can use a fast dedupe. - if itr, ok := input.(FloatIterator); ok && len(opt.Dimensions) == 0 { - if sz := len(fields); sz > 0 && sz < 3 { - input = newFloatFastDedupeIterator(itr) - } else { - input = NewDedupeIterator(itr) - } - } else { - input = NewDedupeIterator(input) - } - } - // Apply limit & offset. - if opt.Limit > 0 || opt.Offset > 0 { - input = NewLimitIterator(input, opt) - } - - // Wrap in an auxiliary iterator to separate the fields. - aitr := NewAuxIterator(input, opt) - tryAddAuxIteratorToContext(ctx, aitr) - - // Generate iterators for each field. - itrs := make([]Iterator, len(fields)) - if err := func() error { - for i, f := range fields { - expr := influxql.Reduce(f.Expr, nil) - itr, err := buildAuxIterator(expr, aitr, opt) - if err != nil { - return err - } - itrs[i] = itr - } - return nil - }(); err != nil { - Iterators(Iterators(itrs).filterNonNil()).Close() - aitr.Close() - return nil, err - } - - // Background the primary iterator since there is no reader for it. - aitr.Background() - - return itrs, nil -} - -// buildAuxIterator constructs an Iterator for an expression from an AuxIterator. -func buildAuxIterator(expr influxql.Expr, aitr AuxIterator, opt IteratorOptions) (Iterator, error) { - switch expr := expr.(type) { - case *influxql.VarRef: - return aitr.Iterator(expr.Val, expr.Type), nil - case *influxql.BinaryExpr: - if rhs, ok := expr.RHS.(influxql.Literal); ok { - // The right hand side is a literal. It is more common to have the RHS be a literal, - // so we check that one first and have this be the happy path. - if lhs, ok := expr.LHS.(influxql.Literal); ok { - // We have two literals that couldn't be combined by Reduce. - return nil, fmt.Errorf("unable to construct an iterator from two literals: LHS: %T, RHS: %T", lhs, rhs) - } - - lhs, err := buildAuxIterator(expr.LHS, aitr, opt) - if err != nil { - return nil, err - } - return buildRHSTransformIterator(lhs, rhs, expr.Op, opt) - } else if lhs, ok := expr.LHS.(influxql.Literal); ok { - rhs, err := buildAuxIterator(expr.RHS, aitr, opt) - if err != nil { - return nil, err - } - return buildLHSTransformIterator(lhs, rhs, expr.Op, opt) - } else { - // We have two iterators. Combine them into a single iterator. - lhs, err := buildAuxIterator(expr.LHS, aitr, opt) - if err != nil { - return nil, err - } - rhs, err := buildAuxIterator(expr.RHS, aitr, opt) - if err != nil { - return nil, err - } - return buildTransformIterator(lhs, rhs, expr.Op, opt) - } - case *influxql.ParenExpr: - return buildAuxIterator(expr.Expr, aitr, opt) - case *influxql.NilLiteral: - return &nilFloatIterator{}, nil - default: - return nil, fmt.Errorf("invalid expression type: %T", expr) - } -} - -// buildFieldIterators creates an iterator for each field expression. -func buildFieldIterators(ctx context.Context, fields influxql.Fields, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions, selector, writeMode bool) ([]Iterator, error) { - // Create iterators from fields against the iterator creator. - itrs := make([]Iterator, len(fields)) - span := tracing.SpanFromContext(ctx) - - if err := func() error { - hasAuxFields := false - - var input Iterator - for i, f := range fields { - // Build iterators for calls first and save the iterator. - // We do this so we can keep the ordering provided by the user, but - // still build the Call's iterator first. - if influxql.ContainsVarRef(f.Expr) { - hasAuxFields = true - continue - } - - var localSpan *tracing.Span - localContext := ctx - - if span != nil { - localSpan = span.StartSpan("expression") - localSpan.SetLabels("expr", f.Expr.String()) - localContext = tracing.NewContextWithSpan(ctx, localSpan) - } - - expr := influxql.Reduce(f.Expr, nil) - itr, err := buildExprIterator(localContext, expr, ic, sources, opt, selector, writeMode) - - if localSpan != nil { - localSpan.Finish() - } - - if err != nil { - return err - } else if itr == nil { - itr = &nilFloatIterator{} - } - - // If there is a limit or offset then apply it. - if opt.Limit > 0 || opt.Offset > 0 { - itr = NewLimitIterator(itr, opt) - } - itrs[i] = itr - input = itr - } - - if input == nil || !hasAuxFields { - return nil - } - - // Build the aux iterators. Previous validation should ensure that only one - // call was present so we build an AuxIterator from that input. - aitr := NewAuxIterator(input, opt) - tryAddAuxIteratorToContext(ctx, aitr) - - for i, f := range fields { - if itrs[i] != nil { - itrs[i] = aitr - continue - } - - expr := influxql.Reduce(f.Expr, nil) - itr, err := buildAuxIterator(expr, aitr, opt) - if err != nil { - return err - } else if itr == nil { - itr = &nilFloatIterator{} - } - itrs[i] = itr - } - aitr.Start() - return nil - - }(); err != nil { - Iterators(Iterators(itrs).filterNonNil()).Close() - return nil, err - } - - return itrs, nil -} - // buildExprIterator creates an iterator for an expression. func buildExprIterator(ctx context.Context, expr influxql.Expr, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions, selector, writeMode bool) (Iterator, error) { opt.Expr = expr @@ -487,12 +148,6 @@ func buildExprIterator(ctx context.Context, expr influxql.Expr, ic IteratorCreat return b.buildVarRefIterator(ctx, expr) case *influxql.Call: return b.buildCallIterator(ctx, expr) - case *influxql.BinaryExpr: - return b.buildBinaryExprIterator(ctx, expr) - case *influxql.ParenExpr: - return buildExprIterator(ctx, expr.Expr, ic, sources, opt, selector, writeMode) - case *influxql.NilLiteral: - return &nilFloatIterator{}, nil default: return nil, fmt.Errorf("invalid expression type: %T", expr) } @@ -831,40 +486,6 @@ func (b *exprIteratorBuilder) buildCallIterator(ctx context.Context, expr *influ return itr, nil } -func (b *exprIteratorBuilder) buildBinaryExprIterator(ctx context.Context, expr *influxql.BinaryExpr) (Iterator, error) { - if rhs, ok := expr.RHS.(influxql.Literal); ok { - // The right hand side is a literal. It is more common to have the RHS be a literal, - // so we check that one first and have this be the happy path. - if lhs, ok := expr.LHS.(influxql.Literal); ok { - // We have two literals that couldn't be combined by Reduce. - return nil, fmt.Errorf("unable to construct an iterator from two literals: LHS: %T, RHS: %T", lhs, rhs) - } - - lhs, err := buildExprIterator(ctx, expr.LHS, b.ic, b.sources, b.opt, b.selector, false) - if err != nil { - return nil, err - } - return buildRHSTransformIterator(lhs, rhs, expr.Op, b.opt) - } else if lhs, ok := expr.LHS.(influxql.Literal); ok { - rhs, err := buildExprIterator(ctx, expr.RHS, b.ic, b.sources, b.opt, b.selector, false) - if err != nil { - return nil, err - } - return buildLHSTransformIterator(lhs, rhs, expr.Op, b.opt) - } else { - // We have two iterators. Combine them into a single iterator. - lhs, err := buildExprIterator(ctx, expr.LHS, b.ic, b.sources, b.opt, false, false) - if err != nil { - return nil, err - } - rhs, err := buildExprIterator(ctx, expr.RHS, b.ic, b.sources, b.opt, false, false) - if err != nil { - return nil, err - } - return buildTransformIterator(lhs, rhs, expr.Op, b.opt) - } -} - func (b *exprIteratorBuilder) callIterator(ctx context.Context, expr *influxql.Call, opt IteratorOptions) (Iterator, error) { inputs := make([]Iterator, 0, len(b.sources)) if err := func() error { @@ -910,968 +531,292 @@ func (b *exprIteratorBuilder) callIterator(ctx context.Context, expr *influxql.C return itr, nil } -func buildRHSTransformIterator(lhs Iterator, rhs influxql.Literal, op influxql.Token, opt IteratorOptions) (Iterator, error) { - itrType, litType := iteratorDataType(lhs), literalDataType(rhs) - if litType == influxql.Unsigned && itrType == influxql.Integer { - // If the literal is unsigned but the iterator is an integer, return - // an error since we cannot add an unsigned to an integer. - return nil, fmt.Errorf("cannot use %s with an integer and unsigned", op) +func buildCursor(ctx context.Context, stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) (Cursor, error) { + switch opt.Fill { + case influxql.NumberFill: + if v, ok := opt.FillValue.(int); ok { + opt.FillValue = int64(v) + } + case influxql.PreviousFill: + opt.FillValue = SkipDefault } - fn := binaryExprFunc(iteratorDataType(lhs), literalDataType(rhs), op) - switch fn := fn.(type) { - case func(float64, float64) float64: - var input FloatIterator - switch lhs := lhs.(type) { - case FloatIterator: - input = lhs - case IntegerIterator: - input = &integerFloatCastIterator{input: lhs} - case UnsignedIterator: - input = &unsignedFloatCastIterator{input: lhs} - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var val float64 - switch rhs := rhs.(type) { - case *influxql.NumberLiteral: - val = rhs.Val - case *influxql.IntegerLiteral: - val = float64(rhs.Val) - case *influxql.UnsignedLiteral: - val = float64(rhs.Val) - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs) - } - return &floatTransformIterator{ - input: input, - fn: func(p *FloatPoint) *FloatPoint { - if p == nil { - return nil - } else if p.Nil { - return p - } - p.Value = fn(p.Value, val) - return p + fields := make([]*influxql.Field, 0, len(stmt.Fields)+1) + if !stmt.OmitTime { + // Add a field with the variable "time" if we have not omitted time. + fields = append(fields, &influxql.Field{ + Expr: &influxql.VarRef{ + Val: "time", + Type: influxql.Time, }, - }, nil - case func(int64, int64) float64: - input, ok := lhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs) - } - - var val int64 - switch rhs := rhs.(type) { - case *influxql.IntegerLiteral: - val = rhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerLiteral", rhs) - } - return &integerFloatTransformIterator{ - input: input, - fn: func(p *IntegerPoint) *FloatPoint { - if p == nil { - return nil - } - - fp := &FloatPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - fp.Nil = true - } else { - fp.Value = fn(p.Value, val) - } - return fp - }, - }, nil - case func(float64, float64) bool: - var input FloatIterator - switch lhs := lhs.(type) { - case FloatIterator: - input = lhs - case IntegerIterator: - input = &integerFloatCastIterator{input: lhs} - case UnsignedIterator: - input = &unsignedFloatCastIterator{input: lhs} - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var val float64 - switch rhs := rhs.(type) { - case *influxql.NumberLiteral: - val = rhs.Val - case *influxql.IntegerLiteral: - val = float64(rhs.Val) - case *influxql.UnsignedLiteral: - val = float64(rhs.Val) - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs) - } - return &floatBoolTransformIterator{ - input: input, - fn: func(p *FloatPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(p.Value, val) - } - return bp - }, - }, nil - case func(int64, int64) int64: - var input IntegerIterator - switch lhs := lhs.(type) { - case IntegerIterator: - input = lhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerIterator", lhs) - } - - var val int64 - switch rhs := rhs.(type) { - case *influxql.IntegerLiteral: - val = rhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerLiteral", rhs) - } - return &integerTransformIterator{ - input: input, - fn: func(p *IntegerPoint) *IntegerPoint { - if p == nil { - return nil - } else if p.Nil { - return p - } - p.Value = fn(p.Value, val) - return p - }, - }, nil - case func(int64, int64) bool: - var input IntegerIterator - switch lhs := lhs.(type) { - case IntegerIterator: - input = lhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerIterator", lhs) - } - - var val int64 - switch rhs := rhs.(type) { - case *influxql.IntegerLiteral: - val = rhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerLiteral", rhs) - } - return &integerBoolTransformIterator{ - input: input, - fn: func(p *IntegerPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(p.Value, val) - } - return bp - }, - }, nil - case func(uint64, uint64) uint64: - var input UnsignedIterator - switch lhs := lhs.(type) { - case UnsignedIterator: - input = lhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var val uint64 - switch rhs := rhs.(type) { - case *influxql.IntegerLiteral: - if rhs.Val < 0 { - return nil, fmt.Errorf("cannot use negative integer '%s' in math with unsigned", rhs) - } - val = uint64(rhs.Val) - case *influxql.UnsignedLiteral: - val = rhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs) - } - return &unsignedTransformIterator{ - input: input, - fn: func(p *UnsignedPoint) *UnsignedPoint { - if p == nil { - return nil - } else if p.Nil { - return p - } - p.Value = fn(p.Value, val) - return p - }, - }, nil - case func(uint64, uint64) bool: - var input UnsignedIterator - switch lhs := lhs.(type) { - case UnsignedIterator: - input = lhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var val uint64 - switch rhs := rhs.(type) { - case *influxql.IntegerLiteral: - if rhs.Val < 0 { - return nil, fmt.Errorf("cannot use negative integer '%s' in math with unsigned", rhs) - } - val = uint64(rhs.Val) - case *influxql.UnsignedLiteral: - val = rhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs) - } - return &unsignedBoolTransformIterator{ - input: input, - fn: func(p *UnsignedPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(p.Value, val) - } - return bp - }, - }, nil - case func(bool, bool) bool: - var input BooleanIterator - switch lhs := lhs.(type) { - case BooleanIterator: - input = lhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an BooleanIterator", lhs) - } - - var val bool - switch rhs := rhs.(type) { - case *influxql.BooleanLiteral: - val = rhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an BooleanLiteral", rhs) - } - return &booleanTransformIterator{ - input: input, - fn: func(p *BooleanPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(p.Value, val) - } - return bp - }, - }, nil + }) } - return nil, fmt.Errorf("unable to construct rhs transform iterator from %T and %T", lhs, rhs) + + // Iterate through each of the fields to add them to the value mapper. + valueMapper := newValueMapper() + for _, f := range stmt.Fields { + fields = append(fields, valueMapper.Map(f)) + + // If the field is a top() or bottom() call, we need to also add + // the extra variables if we are not writing into a target. + if stmt.Target != nil { + continue + } + + switch expr := f.Expr.(type) { + case *influxql.Call: + if expr.Name == "top" || expr.Name == "bottom" { + for i := 1; i < len(expr.Args)-1; i++ { + nf := influxql.Field{Expr: expr.Args[i]} + fields = append(fields, valueMapper.Map(&nf)) + } + } + } + } + + // Set the aliases on each of the columns to what the final name should be. + columns := stmt.ColumnNames() + for i, f := range fields { + f.Alias = columns[i] + } + + // Retrieve the refs to retrieve the auxiliary fields. + var auxKeys []string + if len(valueMapper.refs) > 0 { + opt.Aux = make([]influxql.VarRef, 0, len(valueMapper.refs)) + for ref := range valueMapper.refs { + opt.Aux = append(opt.Aux, *ref) + } + sort.Sort(influxql.VarRefs(opt.Aux)) + + auxKeys = make([]string, len(opt.Aux)) + for i, ref := range opt.Aux { + auxKeys[i] = valueMapper.symbols[ref.String()] + } + } + + // If there are no calls, then produce an auxiliary cursor. + if len(valueMapper.calls) == 0 { + itr, err := buildAuxIterator(ctx, ic, stmt.Sources, opt) + if err != nil { + return nil, err + } + + keys := []string{""} + keys = append(keys, auxKeys...) + + scanner := NewIteratorScanner(itr, keys, opt.FillValue) + return newScannerCursor(scanner, fields, opt), nil + } + + // Check to see if this is a selector statement. + // It is a selector if it is the only selector call and the call itself + // is a selector. + selector := len(valueMapper.calls) == 1 + if selector { + for call := range valueMapper.calls { + if !influxql.IsSelector(call) { + selector = false + } + } + } + + // Produce an iterator for every single call and create an iterator scanner + // associated with it. + scanners := make([]IteratorScanner, 0, len(valueMapper.calls)) + for call, symbol := range valueMapper.calls { + itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil) + if err != nil { + for _, s := range scanners { + s.Close() + } + return nil, err + } + + keys := make([]string, 0, len(auxKeys)+1) + keys = append(keys, symbol) + keys = append(keys, auxKeys...) + + scanner := NewIteratorScanner(itr, keys, opt.FillValue) + scanners = append(scanners, scanner) + } + + if len(scanners) == 1 { + return newScannerCursor(scanners[0], fields, opt), nil + } + return newMultiScannerCursor(scanners, fields, opt), nil } -func buildLHSTransformIterator(lhs influxql.Literal, rhs Iterator, op influxql.Token, opt IteratorOptions) (Iterator, error) { - litType, itrType := literalDataType(lhs), iteratorDataType(rhs) - if litType == influxql.Unsigned && itrType == influxql.Integer { - // If the literal is unsigned but the iterator is an integer, return - // an error since we cannot add an unsigned to an integer. - return nil, fmt.Errorf("cannot use %s with unsigned and an integer", op) +func buildAuxIterator(ctx context.Context, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions) (Iterator, error) { + inputs := make([]Iterator, 0, len(sources)) + if err := func() error { + for _, source := range sources { + switch source := source.(type) { + case *influxql.Measurement: + input, err := ic.CreateIterator(ctx, source, opt) + if err != nil { + return err + } + inputs = append(inputs, input) + case *influxql.SubQuery: + b := subqueryBuilder{ + ic: ic, + stmt: source.Statement, + } + + input, err := b.buildAuxIterator(ctx, opt) + if err != nil { + return err + } + inputs = append(inputs, input) + } + } + return nil + }(); err != nil { + Iterators(inputs).Close() + return nil, err } - fn := binaryExprFunc(litType, itrType, op) - switch fn := fn.(type) { - case func(float64, float64) float64: - var input FloatIterator - switch rhs := rhs.(type) { - case FloatIterator: - input = rhs - case IntegerIterator: - input = &integerFloatCastIterator{input: rhs} - case UnsignedIterator: - input = &unsignedFloatCastIterator{input: rhs} - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) - } - - var val float64 - switch lhs := lhs.(type) { - case *influxql.NumberLiteral: - val = lhs.Val - case *influxql.IntegerLiteral: - val = float64(lhs.Val) - case *influxql.UnsignedLiteral: - val = float64(lhs.Val) - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a NumberLiteral", lhs) - } - return &floatTransformIterator{ - input: input, - fn: func(p *FloatPoint) *FloatPoint { - if p == nil { - return nil - } else if p.Nil { - return p - } - p.Value = fn(val, p.Value) - return p - }, - }, nil - case func(int64, int64) float64: - input, ok := rhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", lhs) - } - - var val int64 - switch lhs := lhs.(type) { - case *influxql.IntegerLiteral: - val = lhs.Val - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerLiteral", rhs) - } - return &integerFloatTransformIterator{ - input: input, - fn: func(p *IntegerPoint) *FloatPoint { - if p == nil { - return nil - } - - fp := &FloatPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - fp.Nil = true - } else { - fp.Value = fn(val, p.Value) - } - return fp - }, - }, nil - case func(float64, float64) bool: - var input FloatIterator - switch rhs := rhs.(type) { - case FloatIterator: - input = rhs - case IntegerIterator: - input = &integerFloatCastIterator{input: rhs} - case UnsignedIterator: - input = &unsignedFloatCastIterator{input: rhs} - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) - } - - var val float64 - switch lhs := lhs.(type) { - case *influxql.NumberLiteral: - val = lhs.Val - case *influxql.IntegerLiteral: - val = float64(lhs.Val) - case *influxql.UnsignedLiteral: - val = float64(lhs.Val) - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a NumberLiteral", lhs) - } - return &floatBoolTransformIterator{ - input: input, - fn: func(p *FloatPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(val, p.Value) - } - return bp - }, - }, nil - case func(int64, int64) int64: - var input IntegerIterator - switch rhs := rhs.(type) { - case IntegerIterator: - input = rhs - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerIterator", rhs) - } - - var val int64 - switch lhs := lhs.(type) { - case *influxql.IntegerLiteral: - val = lhs.Val - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerLiteral", lhs) - } - return &integerTransformIterator{ - input: input, - fn: func(p *IntegerPoint) *IntegerPoint { - if p == nil { - return nil - } else if p.Nil { - return p - } - p.Value = fn(val, p.Value) - return p - }, - }, nil - case func(int64, int64) bool: - var input IntegerIterator - switch rhs := rhs.(type) { - case IntegerIterator: - input = rhs - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an IntegerIterator", rhs) - } - - var val int64 - switch lhs := lhs.(type) { - case *influxql.IntegerLiteral: - val = lhs.Val - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an IntegerLiteral", lhs) - } - return &integerBoolTransformIterator{ - input: input, - fn: func(p *IntegerPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(val, p.Value) - } - return bp - }, - }, nil - case func(uint64, uint64) uint64: - var input UnsignedIterator - switch rhs := rhs.(type) { - case UnsignedIterator: - input = rhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var val uint64 - switch lhs := lhs.(type) { - case *influxql.IntegerLiteral: - if lhs.Val < 0 { - return nil, fmt.Errorf("cannot use negative integer '%s' in math with unsigned", rhs) - } - val = uint64(lhs.Val) - case *influxql.UnsignedLiteral: - val = lhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs) - } - return &unsignedTransformIterator{ - input: input, - fn: func(p *UnsignedPoint) *UnsignedPoint { - if p == nil { - return nil - } else if p.Nil { - return p - } - p.Value = fn(val, p.Value) - return p - }, - }, nil - case func(uint64, uint64) bool: - var input UnsignedIterator - switch rhs := rhs.(type) { - case UnsignedIterator: - input = rhs - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var val uint64 - switch lhs := lhs.(type) { - case *influxql.IntegerLiteral: - if lhs.Val < 0 { - return nil, fmt.Errorf("cannot use negative integer '%s' in math with unsigned", rhs) - } - val = uint64(lhs.Val) - case *influxql.UnsignedLiteral: - val = lhs.Val - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a NumberLiteral", rhs) - } - return &unsignedBoolTransformIterator{ - input: input, - fn: func(p *UnsignedPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(val, p.Value) - } - return bp - }, - }, nil - case func(bool, bool) bool: - var input BooleanIterator - switch rhs := rhs.(type) { - case BooleanIterator: - input = rhs - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an BooleanIterator", rhs) - } - - var val bool - switch lhs := lhs.(type) { - case *influxql.BooleanLiteral: - val = lhs.Val - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a BooleanLiteral", lhs) - } - return &booleanTransformIterator{ - input: input, - fn: func(p *BooleanPoint) *BooleanPoint { - if p == nil { - return nil - } - - bp := &BooleanPoint{ - Name: p.Name, - Tags: p.Tags, - Time: p.Time, - Aux: p.Aux, - } - if p.Nil { - bp.Nil = true - } else { - bp.Value = fn(val, p.Value) - } - return bp - }, - }, nil + // Merge iterators to read auxilary fields. + input, err := Iterators(inputs).Merge(opt) + if err != nil { + Iterators(inputs).Close() + return nil, err + } else if input == nil { + input = &nilFloatIterator{} } - return nil, fmt.Errorf("unable to construct lhs transform iterator from %T and %T", lhs, rhs) + + // Filter out duplicate rows, if required. + if opt.Dedupe { + // If there is no group by and it is a float iterator, see if we can use a fast dedupe. + if itr, ok := input.(FloatIterator); ok && len(opt.Dimensions) == 0 { + if sz := len(opt.Aux); sz > 0 && sz < 3 { + input = newFloatFastDedupeIterator(itr) + } else { + input = NewDedupeIterator(itr) + } + } else { + input = NewDedupeIterator(input) + } + } + // Apply limit & offset. + if opt.Limit > 0 || opt.Offset > 0 { + input = NewLimitIterator(input, opt) + } + return input, nil } -func buildTransformIterator(lhs Iterator, rhs Iterator, op influxql.Token, opt IteratorOptions) (Iterator, error) { - lhsType, rhsType := iteratorDataType(lhs), iteratorDataType(rhs) - if lhsType == influxql.Integer && rhsType == influxql.Unsigned { - return nil, fmt.Errorf("cannot use %s between an integer and unsigned, an explicit cast is required", op) - } else if lhsType == influxql.Unsigned && rhsType == influxql.Integer { - return nil, fmt.Errorf("cannot use %s between unsigned and an integer, an explicit cast is required", op) +func buildFieldIterator(ctx context.Context, expr influxql.Expr, ic IteratorCreator, sources influxql.Sources, opt IteratorOptions, selector, writeMode bool) (Iterator, error) { + input, err := buildExprIterator(ctx, expr, ic, sources, opt, selector, writeMode) + if err != nil { + return nil, err } - fn := binaryExprFunc(lhsType, rhsType, op) - switch fn := fn.(type) { - case func(float64, float64) float64: - var left FloatIterator - switch lhs := lhs.(type) { - case FloatIterator: - left = lhs - case IntegerIterator: - left = &integerFloatCastIterator{input: lhs} - case UnsignedIterator: - left = &unsignedFloatCastIterator{input: lhs} - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var right FloatIterator - switch rhs := rhs.(type) { - case FloatIterator: - right = rhs - case IntegerIterator: - right = &integerFloatCastIterator{input: rhs} - case UnsignedIterator: - right = &unsignedFloatCastIterator{input: rhs} - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) - } - return newFloatExprIterator(left, right, opt, fn), nil - case func(int64, int64) float64: - left, ok := lhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs) - } - right, ok := rhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs) - } - return newIntegerFloatExprIterator(left, right, opt, fn), nil - case func(int64, int64) int64: - left, ok := lhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs) - } - right, ok := rhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs) - } - return newIntegerExprIterator(left, right, opt, fn), nil - case func(uint64, uint64) uint64: - left, ok := lhs.(UnsignedIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an UnsignedIterator", lhs) - } - right, ok := rhs.(UnsignedIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an UnsignedIterator", lhs) - } - return newUnsignedExprIterator(left, right, opt, fn), nil - case func(float64, float64) bool: - var left FloatIterator - switch lhs := lhs.(type) { - case FloatIterator: - left = lhs - case IntegerIterator: - left = &integerFloatCastIterator{input: lhs} - case UnsignedIterator: - left = &unsignedFloatCastIterator{input: lhs} - default: - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a FloatIterator", lhs) - } - - var right FloatIterator - switch rhs := rhs.(type) { - case FloatIterator: - right = rhs - case IntegerIterator: - right = &integerFloatCastIterator{input: rhs} - case UnsignedIterator: - right = &unsignedFloatCastIterator{input: rhs} - default: - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a FloatIterator", rhs) - } - return newFloatBooleanExprIterator(left, right, opt, fn), nil - case func(int64, int64) bool: - left, ok := lhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a IntegerIterator", lhs) - } - right, ok := rhs.(IntegerIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a IntegerIterator", rhs) - } - return newIntegerBooleanExprIterator(left, right, opt, fn), nil - case func(uint64, uint64) bool: - left, ok := lhs.(UnsignedIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as an UnsignedIterator", lhs) - } - right, ok := rhs.(UnsignedIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as an UnsignedIterator", lhs) - } - return newUnsignedBooleanExprIterator(left, right, opt, fn), nil - case func(bool, bool) bool: - left, ok := lhs.(BooleanIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on LHS, unable to use %T as a BooleanIterator", lhs) - } - right, ok := rhs.(BooleanIterator) - if !ok { - return nil, fmt.Errorf("type mismatch on RHS, unable to use %T as a BooleanIterator", rhs) - } - return newBooleanExprIterator(left, right, opt, fn), nil + // Apply limit & offset. + if opt.Limit > 0 || opt.Offset > 0 { + input = NewLimitIterator(input, opt) } - return nil, fmt.Errorf("unable to construct transform iterator from %T and %T", lhs, rhs) + return input, nil } -func iteratorDataType(itr Iterator) influxql.DataType { - switch itr.(type) { - case FloatIterator: - return influxql.Float - case IntegerIterator: - return influxql.Integer - case UnsignedIterator: - return influxql.Unsigned - case StringIterator: - return influxql.String - case BooleanIterator: - return influxql.Boolean - default: - return influxql.Unknown +type valueMapper struct { + // An index that maps a node's string output to its symbol so that all + // nodes with the same signature are mapped the same. + symbols map[string]string + // An index that maps a specific expression to a symbol. This ensures that + // only expressions that were mapped get symbolized. + table map[influxql.Expr]string + // A mapping of calls to their symbol. + calls map[*influxql.Call]string + // A mapping of variable references to their symbol. + refs map[*influxql.VarRef]string + i int +} + +func newValueMapper() *valueMapper { + return &valueMapper{ + symbols: make(map[string]string), + table: make(map[influxql.Expr]string), + calls: make(map[*influxql.Call]string), + refs: make(map[*influxql.VarRef]string), } } -func literalDataType(lit influxql.Literal) influxql.DataType { - switch lit.(type) { - case *influxql.NumberLiteral: - return influxql.Float - case *influxql.IntegerLiteral: - return influxql.Integer - case *influxql.UnsignedLiteral: - return influxql.Unsigned - case *influxql.StringLiteral: - return influxql.String - case *influxql.BooleanLiteral: - return influxql.Boolean - default: - return influxql.Unknown - } +func (v *valueMapper) Map(field *influxql.Field) *influxql.Field { + clone := *field + clone.Expr = influxql.CloneExpr(field.Expr) + + influxql.Walk(v, clone.Expr) + clone.Expr = influxql.RewriteExpr(clone.Expr, v.rewriteExpr) + return &clone } -func binaryExprFunc(typ1 influxql.DataType, typ2 influxql.DataType, op influxql.Token) interface{} { - var fn interface{} - switch typ1 { - case influxql.Float: - fn = floatBinaryExprFunc(op) - case influxql.Integer: - switch typ2 { - case influxql.Float: - fn = floatBinaryExprFunc(op) - case influxql.Unsigned: - // Special case for LT, LTE, GT, and GTE. - fn = unsignedBinaryExprFunc(op) - default: - fn = integerBinaryExprFunc(op) - } - case influxql.Unsigned: - switch typ2 { - case influxql.Float: - fn = floatBinaryExprFunc(op) - case influxql.Integer: - // Special case for LT, LTE, GT, and GTE. - // Since the RHS is an integer, we need to check if it is less than - // zero for the comparison operators to not be subject to overflow. - switch op { - case influxql.LT: - return func(lhs, rhs uint64) bool { - if int64(rhs) < 0 { - return false - } - return lhs < rhs - } - case influxql.LTE: - return func(lhs, rhs uint64) bool { - if int64(rhs) < 0 { - return false - } - return lhs <= rhs - } - case influxql.GT: - return func(lhs, rhs uint64) bool { - if int64(rhs) < 0 { - return true - } - return lhs > rhs - } - case influxql.GTE: - return func(lhs, rhs uint64) bool { - if int64(rhs) < 0 { - return true - } - return lhs >= rhs - } - } - fallthrough - default: - fn = unsignedBinaryExprFunc(op) - } - case influxql.Boolean: - fn = booleanBinaryExprFunc(op) +func (v *valueMapper) Visit(n influxql.Node) influxql.Visitor { + expr, ok := n.(influxql.Expr) + if !ok { + return v } - return fn -} -func floatBinaryExprFunc(op influxql.Token) interface{} { - switch op { - case influxql.ADD: - return func(lhs, rhs float64) float64 { return lhs + rhs } - case influxql.SUB: - return func(lhs, rhs float64) float64 { return lhs - rhs } - case influxql.MUL: - return func(lhs, rhs float64) float64 { return lhs * rhs } - case influxql.DIV: - return func(lhs, rhs float64) float64 { - if rhs == 0 { - return float64(0) + key := expr.String() + symbol, ok := v.symbols[key] + if !ok { + // This symbol has not been assigned yet. + // If this is a call or expression, store the node in + // the appropriate index. + symbol = fmt.Sprintf("val%d", v.i) + switch n := n.(type) { + case *influxql.Call: + if isMathFunction(n) { + return v } - return lhs / rhs + v.calls[n] = symbol + case *influxql.VarRef: + v.refs[n] = symbol + default: + return v } - case influxql.MOD: - return func(lhs, rhs float64) float64 { return math.Mod(lhs, rhs) } - case influxql.EQ: - return func(lhs, rhs float64) bool { return lhs == rhs } - case influxql.NEQ: - return func(lhs, rhs float64) bool { return lhs != rhs } - case influxql.LT: - return func(lhs, rhs float64) bool { return lhs < rhs } - case influxql.LTE: - return func(lhs, rhs float64) bool { return lhs <= rhs } - case influxql.GT: - return func(lhs, rhs float64) bool { return lhs > rhs } - case influxql.GTE: - return func(lhs, rhs float64) bool { return lhs >= rhs } + + // Assign this symbol to the symbol table if it is not presently there + // and increment the value index number. + v.symbols[key] = symbol + v.i++ } + // Store the symbol for this expression so we can later rewrite + // the query correctly. + v.table[expr] = symbol return nil } -func integerBinaryExprFunc(op influxql.Token) interface{} { - switch op { - case influxql.ADD: - return func(lhs, rhs int64) int64 { return lhs + rhs } - case influxql.SUB: - return func(lhs, rhs int64) int64 { return lhs - rhs } - case influxql.MUL: - return func(lhs, rhs int64) int64 { return lhs * rhs } - case influxql.DIV: - return func(lhs, rhs int64) float64 { - if rhs == 0 { - return float64(0) - } - return float64(lhs) / float64(rhs) - } - case influxql.MOD: - return func(lhs, rhs int64) int64 { - if rhs == 0 { - return int64(0) - } - return lhs % rhs - } - case influxql.BITWISE_AND: - return func(lhs, rhs int64) int64 { return lhs & rhs } - case influxql.BITWISE_OR: - return func(lhs, rhs int64) int64 { return lhs | rhs } - case influxql.BITWISE_XOR: - return func(lhs, rhs int64) int64 { return lhs ^ rhs } - case influxql.EQ: - return func(lhs, rhs int64) bool { return lhs == rhs } - case influxql.NEQ: - return func(lhs, rhs int64) bool { return lhs != rhs } - case influxql.LT: - return func(lhs, rhs int64) bool { return lhs < rhs } - case influxql.LTE: - return func(lhs, rhs int64) bool { return lhs <= rhs } - case influxql.GT: - return func(lhs, rhs int64) bool { return lhs > rhs } - case influxql.GTE: - return func(lhs, rhs int64) bool { return lhs >= rhs } +func (v *valueMapper) rewriteExpr(expr influxql.Expr) influxql.Expr { + symbol, ok := v.table[expr] + if !ok { + return expr + } + + valuer := influxql.TypeValuerEval{ + TypeMapper: influxql.MultiTypeMapper( + FunctionTypeMapper{}, + MathTypeMapper{}, + ), + } + typ, _ := valuer.EvalType(expr) + return &influxql.VarRef{ + Val: symbol, + Type: typ, } - return nil } -func unsignedBinaryExprFunc(op influxql.Token) interface{} { - switch op { - case influxql.ADD: - return func(lhs, rhs uint64) uint64 { return lhs + rhs } - case influxql.SUB: - return func(lhs, rhs uint64) uint64 { return lhs - rhs } - case influxql.MUL: - return func(lhs, rhs uint64) uint64 { return lhs * rhs } - case influxql.DIV: - return func(lhs, rhs uint64) uint64 { - if rhs == 0 { - return uint64(0) - } - return lhs / rhs +func validateTypes(stmt *influxql.SelectStatement) error { + valuer := influxql.TypeValuerEval{ + TypeMapper: influxql.MultiTypeMapper( + FunctionTypeMapper{}, + MathTypeMapper{}, + ), + } + for _, f := range stmt.Fields { + if _, err := valuer.EvalType(f.Expr); err != nil { + return err } - case influxql.MOD: - return func(lhs, rhs uint64) uint64 { - if rhs == 0 { - return uint64(0) - } - return lhs % rhs - } - case influxql.BITWISE_AND: - return func(lhs, rhs uint64) uint64 { return lhs & rhs } - case influxql.BITWISE_OR: - return func(lhs, rhs uint64) uint64 { return lhs | rhs } - case influxql.BITWISE_XOR: - return func(lhs, rhs uint64) uint64 { return lhs ^ rhs } - case influxql.EQ: - return func(lhs, rhs uint64) bool { return lhs == rhs } - case influxql.NEQ: - return func(lhs, rhs uint64) bool { return lhs != rhs } - case influxql.LT: - return func(lhs, rhs uint64) bool { return lhs < rhs } - case influxql.LTE: - return func(lhs, rhs uint64) bool { return lhs <= rhs } - case influxql.GT: - return func(lhs, rhs uint64) bool { return lhs > rhs } - case influxql.GTE: - return func(lhs, rhs uint64) bool { return lhs >= rhs } - } - return nil -} - -func booleanBinaryExprFunc(op influxql.Token) interface{} { - switch op { - case influxql.BITWISE_AND: - return func(lhs, rhs bool) bool { return lhs && rhs } - case influxql.BITWISE_OR: - return func(lhs, rhs bool) bool { return lhs || rhs } - case influxql.BITWISE_XOR: - return func(lhs, rhs bool) bool { return lhs != rhs } } return nil } diff --git a/query/select_test.go b/query/select_test.go index a4a944be21..be2b5427ca 100644 --- a/query/select_test.go +++ b/query/select_test.go @@ -2631,6 +2631,32 @@ func TestSelect(t *testing.T) { {Time: 22 * Second, Series: query.Series{Name: "cpu"}, Values: []interface{}{7.953140268154609}}, }, }, + { + name: "DuplicateSelectors", + q: `SELECT min(value) * 2, min(value) / 2 FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-02T00:00:00Z' GROUP BY time(10s), host fill(none)`, + typ: influxql.Float, + expr: `min(value::float)`, + itrs: []query.Iterator{ + &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 0 * Second, Value: 20}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 11 * Second, Value: 3}, + {Name: "cpu", Tags: ParseTags("region=west,host=A"), Time: 31 * Second, Value: 100}, + }}, + &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 9 * Second, Value: 19}, + {Name: "cpu", Tags: ParseTags("region=east,host=A"), Time: 10 * Second, Value: 2}, + }}, + &FloatIterator{Points: []query.FloatPoint{ + {Name: "cpu", Tags: ParseTags("region=west,host=B"), Time: 5 * Second, Value: 10}, + }}, + }, + rows: []query.Row{ + {Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(38), float64(19) / 2}}, + {Time: 10 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(4), float64(1)}}, + {Time: 30 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=A")}, Values: []interface{}{float64(200), float64(50)}}, + {Time: 0 * Second, Series: query.Series{Name: "cpu", Tags: ParseTags("host=B")}, Values: []interface{}{float64(20), float64(5)}}, + }, + }, } { t.Run(tt.name, func(t *testing.T) { shardMapper := ShardMapper{ @@ -2866,7 +2892,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_AdditionRHS_Unsigned", Statement: `SELECT i + 9223372036854775808 FROM cpu`, - Err: `cannot use + with an integer and unsigned`, + Err: `type error: i::integer + 9223372036854775808: cannot use + with an integer and unsigned literal`, }, { Name: "Unsigned_AdditionRHS_Unsigned", @@ -2943,7 +2969,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_AdditionLHS_Unsigned", Statement: `SELECT 9223372036854775808 + i FROM cpu`, - Err: `cannot use + with unsigned and an integer`, + Err: `type error: 9223372036854775808 + i::integer: cannot use + with an integer and unsigned literal`, }, { Name: "Unsigned_AdditionLHS_Unsigned", @@ -3002,7 +3028,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_Add_Unsigned", Statement: `SELECT i + u FROM cpu`, - Err: `cannot use + between an integer and unsigned, an explicit cast is required`, + Err: `type error: i::integer + u::unsigned: cannot use + between an integer and unsigned, an explicit cast is required`, }, { Name: "Float_MultiplicationRHS_Number", @@ -3167,7 +3193,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_Multiply_Unsigned", Statement: `SELECT i * u FROM cpu`, - Err: `cannot use * between an integer and unsigned, an explicit cast is required`, + Err: `type error: i::integer * u::unsigned: cannot use * between an integer and unsigned, an explicit cast is required`, }, { Name: "Float_SubtractionRHS_Number", @@ -3235,7 +3261,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_SubtractionRHS_Unsigned", Statement: `SELECT i - 9223372036854775808 FROM cpu`, - Err: `cannot use - with an integer and unsigned`, + Err: `type error: i::integer - 9223372036854775808: cannot use - with an integer and unsigned literal`, }, // Skip Unsigned_SubtractionRHS_Integer because it would result in underflow. { @@ -3304,7 +3330,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_SubtractionLHS_Unsigned", Statement: `SELECT 9223372036854775808 - i FROM cpu`, - Err: `cannot use - with unsigned and an integer`, + Err: `type error: 9223372036854775808 - i::integer: cannot use - with an integer and unsigned literal`, }, { Name: "Unsigned_SubtractionLHS_Unsigned", @@ -3363,7 +3389,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_Subtract_Unsigned", Statement: `SELECT i - u FROM cpu`, - Err: `cannot use - between an integer and unsigned, an explicit cast is required`, + Err: `type error: i::integer - u::unsigned: cannot use - between an integer and unsigned, an explicit cast is required`, }, { Name: "Float_DivisionRHS_Number", @@ -3431,7 +3457,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_DivisionRHS_Unsigned", Statement: `SELECT i / 9223372036854775808 FROM cpu`, - Err: `cannot use / with an integer and unsigned`, + Err: `type error: i::integer / 9223372036854775808: cannot use / with an integer and unsigned literal`, }, { Name: "Unsigned_DivisionRHS_Unsigned", @@ -3508,7 +3534,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_DivisionLHS_Unsigned", Statement: `SELECT 9223372036854775808 / i FROM cpu`, - Err: `cannot use / with unsigned and an integer`, + Err: `type error: 9223372036854775808 / i::integer: cannot use / with an integer and unsigned literal`, }, { Name: "Unsigned_DivisionLHS_Unsigned", @@ -3567,7 +3593,7 @@ func TestSelect_BinaryExpr(t *testing.T) { { Name: "Integer_Divide_Unsigned", Statement: `SELECT i / u FROM cpu`, - Err: `cannot use / between an integer and unsigned, an explicit cast is required`, + Err: `type error: i::integer / u::unsigned: cannot use / between an integer and unsigned, an explicit cast is required`, }, { Name: "Integer_BitwiseAndRHS", @@ -3741,9 +3767,9 @@ func TestSelect_BinaryExpr_NilValues(t *testing.T) { t.Fatalf("unexpected source: %s", m.Name) } return &FloatIterator{Points: []query.FloatPoint{ - {Name: "cpu", Time: 0 * Second, Value: 20, Aux: []interface{}{float64(20), nil}}, - {Name: "cpu", Time: 5 * Second, Value: 10, Aux: []interface{}{float64(10), float64(15)}}, - {Name: "cpu", Time: 9 * Second, Value: 19, Aux: []interface{}{nil, float64(5)}}, + {Name: "cpu", Time: 0 * Second, Aux: []interface{}{float64(20), nil}}, + {Name: "cpu", Time: 5 * Second, Aux: []interface{}{float64(10), float64(15)}}, + {Name: "cpu", Time: 9 * Second, Aux: []interface{}{nil, float64(5)}}, }}, nil }, } diff --git a/query/subquery.go b/query/subquery.go index 180b66d24a..144aec3786 100644 --- a/query/subquery.go +++ b/query/subquery.go @@ -30,11 +30,10 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti } subOpt.Aux = auxFields - itrs, err := buildIterators(ctx, b.stmt, b.ic, subOpt) + cur, err := buildCursor(ctx, b.stmt, b.ic, subOpt) if err != nil { return nil, err } - cur := b.buildCursor(itrs, subOpt.Ascending) // Construct the iterators for the subquery. input := NewIteratorMapper(cur, nil, indexes, subOpt) @@ -121,11 +120,10 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq } subOpt.Aux = auxFields - itrs, err := buildIterators(ctx, b.stmt, b.ic, subOpt) + cur, err := buildCursor(ctx, b.stmt, b.ic, subOpt) if err != nil { return nil, err } - cur := b.buildCursor(itrs, subOpt.Ascending) // Construct the iterators for the subquery. input := NewIteratorMapper(cur, driver, indexes, subOpt) @@ -135,18 +133,3 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq } return input, nil } - -func (b *subqueryBuilder) buildCursor(itrs []Iterator, ascending bool) Cursor { - columnNames := b.stmt.ColumnNames() - columns := make([]influxql.VarRef, len(itrs)) - for i, itr := range itrs { - columns[i] = influxql.VarRef{ - Val: columnNames[i+1], - Type: iteratorDataType(itr), - } - } - - cur := newCursor(itrs, columns, ascending) - cur.omitTime = true - return cur -} diff --git a/tests/server_test.go b/tests/server_test.go index 6a8adf4430..9a38cbdc6a 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -5570,13 +5570,12 @@ func TestServer_Query_PercentileDerivative(t *testing.T) { }, }...) - for i, query := range test.queries { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + + for _, query := range test.queries { t.Run(query.name, func(t *testing.T) { - if i == 0 { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } if query.skip { t.Skipf("SKIP:: %s", query.name) } @@ -5615,13 +5614,12 @@ func TestServer_Query_UnderscoreMeasurement(t *testing.T) { }, }...) - for i, query := range test.queries { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + + for _, query := range test.queries { t.Run(query.name, func(t *testing.T) { - if i == 0 { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } if query.skip { t.Skipf("SKIP:: %s", query.name) } diff --git a/tsdb/shard.go b/tsdb/shard.go index aec69160d8..a8956a2073 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1221,6 +1221,11 @@ func (a Shards) MapType(measurement, field string) influxql.DataType { return typ } +func (a Shards) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { + typmap := query.CallTypeMapper{} + return typmap.CallType(name, args) +} + func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { switch measurement.SystemIterator { case "_series":