2017-08-15 19:24:22 +00:00
|
|
|
package query
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
|
|
import (
|
2017-10-11 14:08:31 +00:00
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
2015-11-04 21:06:06 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
2016-02-19 20:38:02 +00:00
|
|
|
"io"
|
2017-10-11 14:08:31 +00:00
|
|
|
"regexp"
|
2015-11-04 21:06:06 +00:00
|
|
|
"time"
|
2016-02-19 20:38:02 +00:00
|
|
|
|
|
|
|
"github.com/gogo/protobuf/proto"
|
2017-10-11 14:08:31 +00:00
|
|
|
"github.com/influxdata/influxdb/pkg/tracing"
|
2017-08-15 19:24:22 +00:00
|
|
|
internal "github.com/influxdata/influxdb/query/internal"
|
2017-10-30 21:40:26 +00:00
|
|
|
"github.com/influxdata/influxql"
|
2015-11-04 21:06:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// ErrUnknownCall is returned when operating on an unknown function call.
|
|
|
|
var ErrUnknownCall = errors.New("unknown call")
|
|
|
|
|
|
|
|
const (
|
2016-12-23 23:14:53 +00:00
|
|
|
// secToNs is the number of nanoseconds in a second.
|
|
|
|
secToNs = int64(time.Second)
|
2015-11-04 21:06:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Iterator represents a generic interface for all Iterators.
|
|
|
|
// Most iterator operations are done on the typed sub-interfaces.
|
|
|
|
type Iterator interface {
|
2016-03-17 15:55:37 +00:00
|
|
|
Stats() IteratorStats
|
2015-11-04 21:06:06 +00:00
|
|
|
Close() error
|
|
|
|
}
|
|
|
|
|
|
|
|
// Iterators represents a list of iterators.
|
|
|
|
type Iterators []Iterator
|
|
|
|
|
2016-03-17 15:55:37 +00:00
|
|
|
// Stats returns the aggregation of all iterator stats.
|
|
|
|
func (a Iterators) Stats() IteratorStats {
|
|
|
|
var stats IteratorStats
|
|
|
|
for _, itr := range a {
|
|
|
|
stats.Add(itr.Stats())
|
|
|
|
}
|
|
|
|
return stats
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Close closes all iterators.
|
|
|
|
func (a Iterators) Close() error {
|
|
|
|
for _, itr := range a {
|
|
|
|
itr.Close()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// filterNonNil returns a slice of iterators that removes all nil iterators.
|
|
|
|
func (a Iterators) filterNonNil() []Iterator {
|
|
|
|
other := make([]Iterator, 0, len(a))
|
|
|
|
for _, itr := range a {
|
|
|
|
if itr == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
other = append(other, itr)
|
|
|
|
}
|
|
|
|
return other
|
|
|
|
}
|
|
|
|
|
2017-09-18 17:33:34 +00:00
|
|
|
// dataType determines what slice type this set of iterators should be.
|
|
|
|
// An iterator type is chosen by looking at the first element in the slice
|
|
|
|
// and then returning the data type for that iterator.
|
|
|
|
func (a Iterators) dataType() influxql.DataType {
|
2016-02-03 03:12:32 +00:00
|
|
|
if len(a) == 0 {
|
2017-08-15 19:24:22 +00:00
|
|
|
return influxql.Unknown
|
2016-02-03 03:12:32 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 17:33:34 +00:00
|
|
|
switch a[0].(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return influxql.Float
|
|
|
|
case IntegerIterator:
|
|
|
|
return influxql.Integer
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return influxql.Unsigned
|
2017-09-18 17:33:34 +00:00
|
|
|
case StringIterator:
|
|
|
|
return influxql.String
|
|
|
|
case BooleanIterator:
|
|
|
|
return influxql.Boolean
|
|
|
|
default:
|
|
|
|
return influxql.Unknown
|
2016-02-03 03:12:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-09-18 17:33:34 +00:00
|
|
|
// coerce forces an array of iterators to be a single type.
|
|
|
|
// Iterators that are not of the same type as the first element in the slice
|
|
|
|
// will be closed and dropped.
|
|
|
|
func (a Iterators) coerce() interface{} {
|
|
|
|
typ := a.dataType()
|
2016-02-03 03:12:32 +00:00
|
|
|
switch typ {
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.Float:
|
2016-02-03 03:12:32 +00:00
|
|
|
return newFloatIterators(a)
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.Integer:
|
2016-02-03 03:12:32 +00:00
|
|
|
return newIntegerIterators(a)
|
2017-09-18 16:28:37 +00:00
|
|
|
case influxql.Unsigned:
|
|
|
|
return newUnsignedIterators(a)
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.String:
|
2016-02-03 03:12:32 +00:00
|
|
|
return newStringIterators(a)
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.Boolean:
|
2016-02-03 03:12:32 +00:00
|
|
|
return newBooleanIterators(a)
|
|
|
|
}
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
2016-06-01 15:29:40 +00:00
|
|
|
// Merge combines all iterators into a single iterator.
|
|
|
|
// A sorted merge iterator or a merge iterator can be used based on opt.
|
|
|
|
func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
|
2017-01-10 22:44:35 +00:00
|
|
|
// Check if this is a call expression.
|
2017-08-15 19:24:22 +00:00
|
|
|
call, ok := opt.Expr.(*influxql.Call)
|
2017-01-10 22:44:35 +00:00
|
|
|
|
2016-06-01 15:29:40 +00:00
|
|
|
// Merge into a single iterator.
|
2017-01-10 22:44:35 +00:00
|
|
|
if !ok && opt.MergeSorted() {
|
2016-06-01 15:29:40 +00:00
|
|
|
itr := NewSortedMergeIterator(a, opt)
|
|
|
|
if itr != nil && opt.InterruptCh != nil {
|
|
|
|
itr = NewInterruptIterator(itr, opt.InterruptCh)
|
|
|
|
}
|
|
|
|
return itr, nil
|
|
|
|
}
|
|
|
|
|
2017-01-10 22:44:35 +00:00
|
|
|
// We do not need an ordered output so use a merge iterator.
|
2016-06-01 15:29:40 +00:00
|
|
|
itr := NewMergeIterator(a, opt)
|
|
|
|
if itr == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if opt.InterruptCh != nil {
|
|
|
|
itr = NewInterruptIterator(itr, opt.InterruptCh)
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
|
|
|
|
if !ok {
|
|
|
|
// This is not a call expression so do not use a call iterator.
|
|
|
|
return itr, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// When merging the count() function, use sum() to sum the counted points.
|
|
|
|
if call.Name == "count" {
|
2017-08-15 19:24:22 +00:00
|
|
|
opt.Expr = &influxql.Call{
|
2016-11-23 20:32:42 +00:00
|
|
|
Name: "sum",
|
|
|
|
Args: call.Args,
|
|
|
|
}
|
|
|
|
}
|
2016-06-01 15:29:40 +00:00
|
|
|
return NewCallIterator(itr, opt)
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// NewMergeIterator returns an iterator to merge itrs into one.
|
2016-01-22 17:48:01 +00:00
|
|
|
// Inputs must either be merge iterators or only contain a single name/tag in
|
|
|
|
// sorted order. The iterator will output all points by window, name/tag, then
|
|
|
|
// time. This iterator is useful when you need all of the points for an
|
|
|
|
// interval.
|
2015-11-04 21:06:06 +00:00
|
|
|
func NewMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
|
|
|
|
inputs = Iterators(inputs).filterNonNil()
|
2016-05-05 15:59:03 +00:00
|
|
|
if n := len(inputs); n == 0 {
|
2016-05-03 13:20:20 +00:00
|
|
|
return nil
|
2016-05-05 15:59:03 +00:00
|
|
|
} else if n == 1 {
|
|
|
|
return inputs[0]
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Aggregate functions can use a more relaxed sorting so that points
|
|
|
|
// within a window are grouped. This is much more efficient.
|
2017-09-18 17:33:34 +00:00
|
|
|
switch inputs := Iterators(inputs).coerce().(type) {
|
2016-02-03 03:12:32 +00:00
|
|
|
case []FloatIterator:
|
|
|
|
return newFloatMergeIterator(inputs, opt)
|
|
|
|
case []IntegerIterator:
|
|
|
|
return newIntegerMergeIterator(inputs, opt)
|
2017-09-18 16:28:37 +00:00
|
|
|
case []UnsignedIterator:
|
|
|
|
return newUnsignedMergeIterator(inputs, opt)
|
2016-02-03 03:12:32 +00:00
|
|
|
case []StringIterator:
|
|
|
|
return newStringMergeIterator(inputs, opt)
|
|
|
|
case []BooleanIterator:
|
|
|
|
return newBooleanMergeIterator(inputs, opt)
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-03 03:12:32 +00:00
|
|
|
panic(fmt.Sprintf("unsupported merge iterator type: %T", inputs))
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-06 16:01:40 +00:00
|
|
|
// NewParallelMergeIterator returns an iterator that breaks input iterators
|
|
|
|
// into groups and processes them in parallel.
|
|
|
|
func NewParallelMergeIterator(inputs []Iterator, opt IteratorOptions, parallelism int) Iterator {
|
|
|
|
inputs = Iterators(inputs).filterNonNil()
|
|
|
|
if len(inputs) == 0 {
|
|
|
|
return nil
|
|
|
|
} else if len(inputs) == 1 {
|
|
|
|
return inputs[0]
|
|
|
|
}
|
|
|
|
|
|
|
|
// Limit parallelism to the number of inputs.
|
|
|
|
if len(inputs) < parallelism {
|
|
|
|
parallelism = len(inputs)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Determine the number of inputs per output iterator.
|
|
|
|
n := len(inputs) / parallelism
|
|
|
|
|
|
|
|
// Group iterators together.
|
|
|
|
outputs := make([]Iterator, parallelism)
|
|
|
|
for i := range outputs {
|
|
|
|
var slice []Iterator
|
|
|
|
if i < len(outputs)-1 {
|
|
|
|
slice = inputs[i*n : (i+1)*n]
|
|
|
|
} else {
|
|
|
|
slice = inputs[i*n:]
|
|
|
|
}
|
|
|
|
|
|
|
|
outputs[i] = newParallelIterator(NewMergeIterator(slice, opt))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Merge all groups together.
|
|
|
|
return NewMergeIterator(outputs, opt)
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// NewSortedMergeIterator returns an iterator to merge itrs into one.
|
2016-01-22 17:48:01 +00:00
|
|
|
// Inputs must either be sorted merge iterators or only contain a single
|
|
|
|
// name/tag in sorted order. The iterator will output all points by name/tag,
|
|
|
|
// then time. This iterator is useful when you need all points for a name/tag
|
|
|
|
// to be in order.
|
2015-11-04 21:06:06 +00:00
|
|
|
func NewSortedMergeIterator(inputs []Iterator, opt IteratorOptions) Iterator {
|
|
|
|
inputs = Iterators(inputs).filterNonNil()
|
|
|
|
if len(inputs) == 0 {
|
2016-05-03 13:20:20 +00:00
|
|
|
return nil
|
2016-11-23 20:32:42 +00:00
|
|
|
} else if len(inputs) == 1 {
|
|
|
|
return inputs[0]
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2017-09-18 17:33:34 +00:00
|
|
|
switch inputs := Iterators(inputs).coerce().(type) {
|
2016-02-03 03:12:32 +00:00
|
|
|
case []FloatIterator:
|
|
|
|
return newFloatSortedMergeIterator(inputs, opt)
|
|
|
|
case []IntegerIterator:
|
|
|
|
return newIntegerSortedMergeIterator(inputs, opt)
|
2017-09-18 16:28:37 +00:00
|
|
|
case []UnsignedIterator:
|
|
|
|
return newUnsignedSortedMergeIterator(inputs, opt)
|
2016-02-03 03:12:32 +00:00
|
|
|
case []StringIterator:
|
|
|
|
return newStringSortedMergeIterator(inputs, opt)
|
|
|
|
case []BooleanIterator:
|
|
|
|
return newBooleanSortedMergeIterator(inputs, opt)
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
2016-02-03 03:12:32 +00:00
|
|
|
panic(fmt.Sprintf("unsupported sorted merge iterator type: %T", inputs))
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-06 16:01:40 +00:00
|
|
|
// newParallelIterator returns an iterator that runs in a separate goroutine.
|
|
|
|
func newParallelIterator(input Iterator) Iterator {
|
|
|
|
if input == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
switch itr := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatParallelIterator(itr)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerParallelIterator(itr)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedParallelIterator(itr)
|
2016-05-06 16:01:40 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringParallelIterator(itr)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanParallelIterator(itr)
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported parallel iterator type: %T", itr))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-12-24 04:42:10 +00:00
|
|
|
// NewLimitIterator returns an iterator that limits the number of points per grouping.
|
|
|
|
func NewLimitIterator(input Iterator, opt IteratorOptions) Iterator {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatLimitIterator(input, opt)
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerLimitIterator(input, opt)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedLimitIterator(input, opt)
|
2016-01-18 22:48:49 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringLimitIterator(input, opt)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanLimitIterator(input, opt)
|
2015-12-24 04:42:10 +00:00
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported limit iterator type: %T", input))
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2015-12-24 04:42:10 +00:00
|
|
|
}
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2017-01-20 17:04:35 +00:00
|
|
|
// NewFilterIterator returns an iterator that filters the points based on the
|
|
|
|
// condition. This iterator is not nearly as efficient as filtering points
|
|
|
|
// within the query engine and is only used when filtering subqueries.
|
2017-08-15 19:24:22 +00:00
|
|
|
func NewFilterIterator(input Iterator, cond influxql.Expr, opt IteratorOptions) Iterator {
|
2017-01-20 17:04:35 +00:00
|
|
|
if input == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatFilterIterator(input, cond, opt)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerFilterIterator(input, cond, opt)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedFilterIterator(input, cond, opt)
|
2017-01-20 17:04:35 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringFilterIterator(input, cond, opt)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanFilterIterator(input, cond, opt)
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported filter iterator type: %T", input))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-04 14:59:41 +00:00
|
|
|
// NewTagSubsetIterator will strip each of the points to a subset of the tag key values
|
|
|
|
// for each point it processes.
|
|
|
|
func NewTagSubsetIterator(input Iterator, opt IteratorOptions) Iterator {
|
|
|
|
if input == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatTagSubsetIterator(input, opt)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerTagSubsetIterator(input, opt)
|
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedTagSubsetIterator(input, opt)
|
|
|
|
case StringIterator:
|
|
|
|
return newStringTagSubsetIterator(input, opt)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanTagSubsetIterator(input, opt)
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported tag subset iterator type: %T", input))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-02-05 17:23:35 +00:00
|
|
|
// NewDedupeIterator returns an iterator that only outputs unique points.
|
|
|
|
// This iterator maintains a serialized copy of each row so it is inefficient
|
|
|
|
// to use on large datasets. It is intended for small datasets such as meta queries.
|
|
|
|
func NewDedupeIterator(input Iterator) Iterator {
|
|
|
|
if input == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatDedupeIterator(input)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerDedupeIterator(input)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedDedupeIterator(input)
|
2016-02-05 17:23:35 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringDedupeIterator(input)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanDedupeIterator(input)
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported dedupe iterator type: %T", input))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-02-05 14:49:42 +00:00
|
|
|
// NewFillIterator returns an iterator that fills in missing points in an aggregate.
|
2017-08-15 19:24:22 +00:00
|
|
|
func NewFillIterator(input Iterator, expr influxql.Expr, opt IteratorOptions) Iterator {
|
2016-02-05 14:49:42 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
2016-02-09 01:39:44 +00:00
|
|
|
return newFloatFillIterator(input, expr, opt)
|
2016-02-05 14:49:42 +00:00
|
|
|
case IntegerIterator:
|
2016-02-09 01:39:44 +00:00
|
|
|
return newIntegerFillIterator(input, expr, opt)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedFillIterator(input, expr, opt)
|
2016-02-05 14:49:42 +00:00
|
|
|
case StringIterator:
|
2016-02-09 01:39:44 +00:00
|
|
|
return newStringFillIterator(input, expr, opt)
|
2016-02-05 14:49:42 +00:00
|
|
|
case BooleanIterator:
|
2016-02-09 01:39:44 +00:00
|
|
|
return newBooleanFillIterator(input, expr, opt)
|
2016-02-05 14:49:42 +00:00
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported fill iterator type: %T", input))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-04 01:53:45 +00:00
|
|
|
// NewIntervalIterator returns an iterator that sets the time on each point to the interval.
|
|
|
|
func NewIntervalIterator(input Iterator, opt IteratorOptions) Iterator {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatIntervalIterator(input, opt)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerIntervalIterator(input, opt)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedIntervalIterator(input, opt)
|
2016-03-04 01:53:45 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringIntervalIterator(input, opt)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanIntervalIterator(input, opt)
|
|
|
|
default:
|
2017-08-25 12:40:21 +00:00
|
|
|
panic(fmt.Sprintf("unsupported interval iterator type: %T", input))
|
2016-03-04 01:53:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// NewInterruptIterator returns an iterator that will stop producing output
|
|
|
|
// when the passed-in channel is closed.
|
2016-03-09 14:36:18 +00:00
|
|
|
func NewInterruptIterator(input Iterator, closing <-chan struct{}) Iterator {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatInterruptIterator(input, closing)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerInterruptIterator(input, closing)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedInterruptIterator(input, closing)
|
2016-03-09 14:36:18 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringInterruptIterator(input, closing)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanInterruptIterator(input, closing)
|
|
|
|
default:
|
2016-06-01 17:30:50 +00:00
|
|
|
panic(fmt.Sprintf("unsupported interrupt iterator type: %T", input))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewCloseInterruptIterator returns an iterator that will invoke the Close() method on an
|
2016-12-30 01:39:24 +00:00
|
|
|
// iterator when the passed-in channel has been closed.
|
2016-06-01 17:30:50 +00:00
|
|
|
func NewCloseInterruptIterator(input Iterator, closing <-chan struct{}) Iterator {
|
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return newFloatCloseInterruptIterator(input, closing)
|
|
|
|
case IntegerIterator:
|
|
|
|
return newIntegerCloseInterruptIterator(input, closing)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
return newUnsignedCloseInterruptIterator(input, closing)
|
2016-06-01 17:30:50 +00:00
|
|
|
case StringIterator:
|
|
|
|
return newStringCloseInterruptIterator(input, closing)
|
|
|
|
case BooleanIterator:
|
|
|
|
return newBooleanCloseInterruptIterator(input, closing)
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported close iterator iterator type: %T", input))
|
2016-03-09 14:36:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
// IteratorScanner is used to scan the results of an iterator into a map.
|
|
|
|
type IteratorScanner interface {
|
|
|
|
// Peek retrieves information about the next point. It returns a timestamp, the name, and the tags.
|
|
|
|
Peek() (int64, string, Tags)
|
2015-11-04 21:06:06 +00:00
|
|
|
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
// ScanAt will take a time, name, and tags and scan the point that matches those into the map.
|
|
|
|
ScanAt(ts int64, name string, tags Tags, values map[string]interface{})
|
2016-02-09 23:03:44 +00:00
|
|
|
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
// Stats returns the IteratorStats from the Iterator.
|
|
|
|
Stats() IteratorStats
|
|
|
|
|
|
|
|
// Err returns an error that was encountered while scanning.
|
|
|
|
Err() error
|
2016-03-13 16:05:47 +00:00
|
|
|
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
io.Closer
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
// SkipDefault is a sentinel value to tell the IteratorScanner to skip setting the
|
|
|
|
// default value if none was present. This causes the map to use the previous value
|
|
|
|
// if it was previously set.
|
|
|
|
var SkipDefault = interface{}(0)
|
|
|
|
|
|
|
|
// NewIteratorScanner produces an IteratorScanner for the Iterator.
|
2018-03-30 21:58:37 +00:00
|
|
|
func NewIteratorScanner(input Iterator, keys []influxql.VarRef, defaultValue interface{}) IteratorScanner {
|
2015-11-04 21:06:06 +00:00
|
|
|
switch input := input.(type) {
|
|
|
|
case FloatIterator:
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
return newFloatIteratorScanner(input, keys, defaultValue)
|
2016-01-18 22:48:49 +00:00
|
|
|
case IntegerIterator:
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
return newIntegerIteratorScanner(input, keys, defaultValue)
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
return newUnsignedIteratorScanner(input, keys, defaultValue)
|
2015-11-04 21:06:06 +00:00
|
|
|
case StringIterator:
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
return newStringIteratorScanner(input, keys, defaultValue)
|
2015-11-04 21:06:06 +00:00
|
|
|
case BooleanIterator:
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
return newBooleanIteratorScanner(input, keys, defaultValue)
|
2015-11-04 21:06:06 +00:00
|
|
|
default:
|
Refactor the math engine to compile the query and use eval
This change makes it so that we simplify the math engine so it doesn't
use a complicated set of nested iterators. That way, we have to change
math in one fewer place.
It also greatly simplifies the query engine as now we can create the
necessary iterators, join them by time, name, and tags, and then use the
cursor interface to read them and use eval to compute the result. It
makes it so the auxiliary iterators and all of their complexity can be
removed.
This also makes use of the new eval functionality that was recently
added to the influxql package.
No math functions have been added, but the scaffolding has been included
so things like trigonometry functions are just a single commit away.
This also introduces a small breaking change. Because of the call
optimization, it is now possible to use the same selector multiple times
as a selector. So if you do this:
SELECT max(value) * 2, max(value) / 2 FROM cpu
This will now return the timestamp of the max value rather than zero
since this query is considered to have only a single selector rather
than multiple separate selectors. If any aspect of the selector is
different, such as different selector functions or different arguments,
it will consider the selectors to be aggregates like the old behavior.
2018-03-19 17:05:55 +00:00
|
|
|
panic(fmt.Sprintf("unsupported type for iterator scanner: %T", input))
|
2016-04-17 20:00:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// DrainIterator reads and discards all points from itr.
|
2016-04-04 16:53:39 +00:00
|
|
|
func DrainIterator(itr Iterator) {
|
2016-07-27 22:00:57 +00:00
|
|
|
defer itr.Close()
|
2016-04-04 16:53:39 +00:00
|
|
|
switch itr := itr.(type) {
|
|
|
|
case FloatIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-04-04 16:53:39 +00:00
|
|
|
case IntegerIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
2016-04-04 16:53:39 +00:00
|
|
|
}
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
|
|
|
}
|
2016-04-04 16:53:39 +00:00
|
|
|
case StringIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
2016-04-04 16:53:39 +00:00
|
|
|
}
|
|
|
|
case BooleanIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
for p, _ := itr.Next(); p != nil; p, _ = itr.Next() {
|
2016-04-04 16:53:39 +00:00
|
|
|
}
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported iterator type for draining: %T", itr))
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// DrainIterators reads and discards all points from itrs.
|
2016-04-14 17:08:41 +00:00
|
|
|
func DrainIterators(itrs []Iterator) {
|
2016-07-27 22:00:57 +00:00
|
|
|
defer Iterators(itrs).Close()
|
2016-04-14 17:08:41 +00:00
|
|
|
for {
|
|
|
|
var hasData bool
|
|
|
|
|
|
|
|
for _, itr := range itrs {
|
|
|
|
switch itr := itr.(type) {
|
|
|
|
case FloatIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
if p, _ := itr.Next(); p != nil {
|
2016-04-14 17:08:41 +00:00
|
|
|
hasData = true
|
|
|
|
}
|
|
|
|
case IntegerIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
if p, _ := itr.Next(); p != nil {
|
2016-04-14 17:08:41 +00:00
|
|
|
hasData = true
|
|
|
|
}
|
2017-09-18 16:28:37 +00:00
|
|
|
case UnsignedIterator:
|
|
|
|
if p, _ := itr.Next(); p != nil {
|
|
|
|
hasData = true
|
|
|
|
}
|
2016-04-14 17:08:41 +00:00
|
|
|
case StringIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
if p, _ := itr.Next(); p != nil {
|
2016-04-14 17:08:41 +00:00
|
|
|
hasData = true
|
|
|
|
}
|
|
|
|
case BooleanIterator:
|
2016-04-17 20:00:59 +00:00
|
|
|
if p, _ := itr.Next(); p != nil {
|
2016-04-14 17:08:41 +00:00
|
|
|
hasData = true
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported iterator type for draining: %T", itr))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Exit once all iterators return a nil point.
|
|
|
|
if !hasData {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
// NewReaderIterator returns an iterator that streams from a reader.
|
2017-10-11 14:08:31 +00:00
|
|
|
func NewReaderIterator(ctx context.Context, r io.Reader, typ influxql.DataType, stats IteratorStats) Iterator {
|
2016-03-30 00:17:12 +00:00
|
|
|
switch typ {
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.Float:
|
2017-10-11 14:08:31 +00:00
|
|
|
return newFloatReaderIterator(ctx, r, stats)
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.Integer:
|
2017-10-11 14:08:31 +00:00
|
|
|
return newIntegerReaderIterator(ctx, r, stats)
|
2017-09-18 16:28:37 +00:00
|
|
|
case influxql.Unsigned:
|
2017-10-11 14:08:31 +00:00
|
|
|
return newUnsignedReaderIterator(ctx, r, stats)
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.String:
|
2017-10-11 14:08:31 +00:00
|
|
|
return newStringReaderIterator(ctx, r, stats)
|
2017-08-15 19:24:22 +00:00
|
|
|
case influxql.Boolean:
|
2017-10-11 14:08:31 +00:00
|
|
|
return newBooleanReaderIterator(ctx, r, stats)
|
2016-02-19 20:38:02 +00:00
|
|
|
default:
|
2017-08-09 21:46:44 +00:00
|
|
|
return &nilFloatReaderIterator{r: r}
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-30 01:39:24 +00:00
|
|
|
// IteratorCreator is an interface to create Iterators.
|
2015-11-04 21:06:06 +00:00
|
|
|
type IteratorCreator interface {
|
|
|
|
// Creates a simple iterator for use in an InfluxQL query.
|
2017-10-11 14:08:31 +00:00
|
|
|
CreateIterator(ctx context.Context, source *influxql.Measurement, opt IteratorOptions) (Iterator, error)
|
2017-08-24 16:27:29 +00:00
|
|
|
|
|
|
|
// Determines the potential cost for creating an iterator.
|
|
|
|
IteratorCost(source *influxql.Measurement, opt IteratorOptions) (IteratorCost, error)
|
2016-03-04 18:01:41 +00:00
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// IteratorOptions is an object passed to CreateIterator to specify creation options.
|
|
|
|
type IteratorOptions struct {
|
|
|
|
// Expression to iterate for.
|
|
|
|
// This can be VarRef or a Call.
|
2017-08-15 19:24:22 +00:00
|
|
|
Expr influxql.Expr
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2019-02-03 20:27:43 +00:00
|
|
|
// Auxiliary tags or values to also retrieve for the point.
|
2017-08-15 19:24:22 +00:00
|
|
|
Aux []influxql.VarRef
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
// Data sources from which to receive data. This is only used for encoding
|
|
|
|
// measurements over RPC and is no longer used in the open source version.
|
2017-08-15 19:24:22 +00:00
|
|
|
Sources []influxql.Source
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
|
|
// Group by interval and tags.
|
|
|
|
Interval Interval
|
2016-11-23 20:32:42 +00:00
|
|
|
Dimensions []string // The final dimensions of the query (stays the same even in subqueries).
|
|
|
|
GroupBy map[string]struct{} // Dimensions to group points by in intermediate iterators.
|
2016-12-23 23:14:53 +00:00
|
|
|
Location *time.Location
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2016-02-05 14:49:42 +00:00
|
|
|
// Fill options.
|
2017-08-15 19:24:22 +00:00
|
|
|
Fill influxql.FillOption
|
2016-02-05 14:49:42 +00:00
|
|
|
FillValue interface{}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Condition to filter by.
|
2017-08-15 19:24:22 +00:00
|
|
|
Condition influxql.Expr
|
2015-11-04 21:06:06 +00:00
|
|
|
|
|
|
|
// Time range for the iterator.
|
|
|
|
StartTime int64
|
|
|
|
EndTime int64
|
|
|
|
|
|
|
|
// Sorted in time ascending order if true.
|
|
|
|
Ascending bool
|
2015-12-22 22:46:10 +00:00
|
|
|
|
|
|
|
// Limits the number of points per series.
|
|
|
|
Limit, Offset int
|
|
|
|
|
|
|
|
// Limits the number of series.
|
|
|
|
SLimit, SOffset int
|
2016-02-05 17:23:35 +00:00
|
|
|
|
2017-08-24 14:45:46 +00:00
|
|
|
// Removes the measurement name. Useful for meta queries.
|
|
|
|
StripName bool
|
|
|
|
|
2016-02-05 17:23:35 +00:00
|
|
|
// Removes duplicate rows from raw queries.
|
|
|
|
Dedupe bool
|
2016-03-09 14:36:18 +00:00
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
// Determines if this is a query for raw data or an aggregate/selector.
|
|
|
|
Ordered bool
|
|
|
|
|
|
|
|
// Limits on the creation of iterators.
|
|
|
|
MaxSeriesN int
|
|
|
|
|
2016-03-09 14:36:18 +00:00
|
|
|
// If this channel is set and is closed, the iterator should try to exit
|
|
|
|
// and close as soon as possible.
|
|
|
|
InterruptCh <-chan struct{}
|
2017-05-05 17:20:00 +00:00
|
|
|
|
2017-09-14 14:06:11 +00:00
|
|
|
// Authorizer can limit access to data
|
2017-05-05 17:20:00 +00:00
|
|
|
Authorizer Authorizer
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newIteratorOptionsStmt creates the iterator options from stmt.
|
2017-08-23 18:20:51 +00:00
|
|
|
func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions) (opt IteratorOptions, err error) {
|
2015-11-04 21:06:06 +00:00
|
|
|
// Determine time range from the condition.
|
2017-08-16 15:33:00 +00:00
|
|
|
valuer := &influxql.NowValuer{Location: stmt.Location}
|
|
|
|
condition, timeRange, err := influxql.ConditionExpr(stmt.Condition, valuer)
|
2016-03-28 17:26:13 +00:00
|
|
|
if err != nil {
|
|
|
|
return IteratorOptions{}, err
|
|
|
|
}
|
|
|
|
|
2017-08-16 15:33:00 +00:00
|
|
|
if !timeRange.Min.IsZero() {
|
|
|
|
opt.StartTime = timeRange.Min.UnixNano()
|
2015-11-04 21:06:06 +00:00
|
|
|
} else {
|
2017-08-22 18:18:41 +00:00
|
|
|
opt.StartTime = influxql.MinTime
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2017-08-16 15:33:00 +00:00
|
|
|
if !timeRange.Max.IsZero() {
|
|
|
|
opt.EndTime = timeRange.Max.UnixNano()
|
2015-11-04 21:06:06 +00:00
|
|
|
} else {
|
2017-08-22 18:18:41 +00:00
|
|
|
opt.EndTime = influxql.MaxTime
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2016-12-23 23:14:53 +00:00
|
|
|
opt.Location = stmt.Location
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Determine group by interval.
|
|
|
|
interval, err := stmt.GroupByInterval()
|
|
|
|
if err != nil {
|
|
|
|
return opt, err
|
|
|
|
}
|
2016-01-19 19:25:56 +00:00
|
|
|
// Set duration to zero if a negative interval has been used.
|
|
|
|
if interval < 0 {
|
|
|
|
interval = 0
|
2016-05-02 18:02:33 +00:00
|
|
|
} else if interval > 0 {
|
2016-07-19 03:47:43 +00:00
|
|
|
opt.Interval.Offset, err = stmt.GroupByOffset()
|
2016-05-02 18:02:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return opt, err
|
|
|
|
}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
2015-11-04 21:06:06 +00:00
|
|
|
opt.Interval.Duration = interval
|
|
|
|
|
2017-04-11 15:30:06 +00:00
|
|
|
// Always request an ordered output for the top level iterators.
|
|
|
|
// The emitter will always emit points as ordered.
|
|
|
|
opt.Ordered = true
|
2016-11-23 20:32:42 +00:00
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Determine dimensions.
|
2016-11-23 20:32:42 +00:00
|
|
|
opt.GroupBy = make(map[string]struct{}, len(opt.Dimensions))
|
2015-11-04 21:06:06 +00:00
|
|
|
for _, d := range stmt.Dimensions {
|
2017-08-15 19:24:22 +00:00
|
|
|
if d, ok := d.Expr.(*influxql.VarRef); ok {
|
2015-11-04 21:06:06 +00:00
|
|
|
opt.Dimensions = append(opt.Dimensions, d.Val)
|
2016-11-23 20:32:42 +00:00
|
|
|
opt.GroupBy[d.Val] = struct{}{}
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-16 15:33:00 +00:00
|
|
|
opt.Condition = condition
|
2015-11-04 21:06:06 +00:00
|
|
|
opt.Ascending = stmt.TimeAscending()
|
2016-02-05 17:23:35 +00:00
|
|
|
opt.Dedupe = stmt.Dedupe
|
2017-08-24 14:45:46 +00:00
|
|
|
opt.StripName = stmt.StripName
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2016-02-05 14:49:42 +00:00
|
|
|
opt.Fill, opt.FillValue = stmt.Fill, stmt.FillValue
|
2017-08-15 19:24:22 +00:00
|
|
|
if opt.Fill == influxql.NullFill && stmt.Target != nil {
|
2016-03-16 15:14:41 +00:00
|
|
|
// Set the fill option to none if a target has been given.
|
|
|
|
// Null values will get ignored when being written to the target
|
|
|
|
// so fill(null) wouldn't write any null values to begin with.
|
2017-08-15 19:24:22 +00:00
|
|
|
opt.Fill = influxql.NoFill
|
2016-03-16 15:14:41 +00:00
|
|
|
}
|
2015-12-22 22:46:10 +00:00
|
|
|
opt.Limit, opt.Offset = stmt.Limit, stmt.Offset
|
|
|
|
opt.SLimit, opt.SOffset = stmt.SLimit, stmt.SOffset
|
2017-08-23 18:20:51 +00:00
|
|
|
opt.MaxSeriesN = sopt.MaxSeriesN
|
|
|
|
opt.Authorizer = sopt.Authorizer
|
2015-12-22 22:46:10 +00:00
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
return opt, nil
|
|
|
|
}
|
|
|
|
|
2018-02-27 23:10:10 +00:00
|
|
|
func newIteratorOptionsSubstatement(ctx context.Context, stmt *influxql.SelectStatement, opt IteratorOptions) (IteratorOptions, error) {
|
2018-12-07 21:08:24 +00:00
|
|
|
subOpt, err := newIteratorOptionsStmt(stmt, SelectOptions{
|
|
|
|
Authorizer: opt.Authorizer,
|
|
|
|
MaxSeriesN: opt.MaxSeriesN,
|
|
|
|
})
|
2016-11-23 20:32:42 +00:00
|
|
|
if err != nil {
|
|
|
|
return IteratorOptions{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if subOpt.StartTime < opt.StartTime {
|
|
|
|
subOpt.StartTime = opt.StartTime
|
|
|
|
}
|
|
|
|
if subOpt.EndTime > opt.EndTime {
|
|
|
|
subOpt.EndTime = opt.EndTime
|
|
|
|
}
|
2018-02-27 23:10:10 +00:00
|
|
|
if !subOpt.Interval.IsZero() && subOpt.EndTime == influxql.MaxTime {
|
|
|
|
if now := ctx.Value("now"); now != nil {
|
|
|
|
subOpt.EndTime = now.(time.Time).UnixNano()
|
|
|
|
}
|
|
|
|
}
|
Update subqueries so groupings are propagated to inner queries
Previously, only time expressions got propagated inwards. The reason for
this was simple. If the outer query was going to filter to a specific
time range, then it would be unnecessary for the inner query to output
points within that time frame. It started as an optimization, but became
a feature because there was no reason to have the user repeat the same
time clause for the inner query as the outer query. So we allowed an
aggregate query with an interval to pass validation in the subquery if
the outer query had a time range. But `GROUP BY` clauses were not
propagated because that same logic didn't apply to them. It's not an
optimization there. So while grouping by a tag in the outer query
without grouping by it in the inner query was useless, there wasn't any
particular reason to care.
Then a bug was found where wildcards would propagate the dimensions
correctly, but the outer query containing a group by with the inner
query omitting it wouldn't correctly filter out the outer group by. We
could fix that filtering, but on further review, I had been seeing
people make that same mistake a lot. People seem to just believe that
the grouping should be propagated inwards. Instead of trying to fight
what the user wanted and explicitly erase groupings that weren't
propagated manually, we might as well just propagate them for the user
to make their lives easier. There is no useful situation where you would
want to group into buckets that can't physically exist so we might as
well do _something_ useful.
This will also now propagate time intervals to inner queries since the
same applies there. But, while the interval propagates, the following
query will not pass validation since it is still not possible to use a
grouping interval with a raw query (even if the inner query is an
aggregate):
SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m)
This also means wildcards will behave a bit differently. They will
retrieve dimensions from the sources in the inner query rather than just
using the dimensions in the group by.
Fixing top() and bottom() to return the correct auxiliary fields.
Unfortunately, we were not copying the buffer with the auxiliary fields
so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
|
|
|
// Propagate the dimensions to the inner subquery.
|
2016-11-23 20:32:42 +00:00
|
|
|
subOpt.Dimensions = opt.Dimensions
|
Update subqueries so groupings are propagated to inner queries
Previously, only time expressions got propagated inwards. The reason for
this was simple. If the outer query was going to filter to a specific
time range, then it would be unnecessary for the inner query to output
points within that time frame. It started as an optimization, but became
a feature because there was no reason to have the user repeat the same
time clause for the inner query as the outer query. So we allowed an
aggregate query with an interval to pass validation in the subquery if
the outer query had a time range. But `GROUP BY` clauses were not
propagated because that same logic didn't apply to them. It's not an
optimization there. So while grouping by a tag in the outer query
without grouping by it in the inner query was useless, there wasn't any
particular reason to care.
Then a bug was found where wildcards would propagate the dimensions
correctly, but the outer query containing a group by with the inner
query omitting it wouldn't correctly filter out the outer group by. We
could fix that filtering, but on further review, I had been seeing
people make that same mistake a lot. People seem to just believe that
the grouping should be propagated inwards. Instead of trying to fight
what the user wanted and explicitly erase groupings that weren't
propagated manually, we might as well just propagate them for the user
to make their lives easier. There is no useful situation where you would
want to group into buckets that can't physically exist so we might as
well do _something_ useful.
This will also now propagate time intervals to inner queries since the
same applies there. But, while the interval propagates, the following
query will not pass validation since it is still not possible to use a
grouping interval with a raw query (even if the inner query is an
aggregate):
SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m)
This also means wildcards will behave a bit differently. They will
retrieve dimensions from the sources in the inner query rather than just
using the dimensions in the group by.
Fixing top() and bottom() to return the correct auxiliary fields.
Unfortunately, we were not copying the buffer with the auxiliary fields
so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
|
|
|
for d := range opt.GroupBy {
|
|
|
|
subOpt.GroupBy[d] = struct{}{}
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
subOpt.InterruptCh = opt.InterruptCh
|
|
|
|
|
2017-09-08 16:55:53 +00:00
|
|
|
// Extract the time range and condition from the condition.
|
|
|
|
cond, t, err := influxql.ConditionExpr(stmt.Condition, nil)
|
|
|
|
if err != nil {
|
|
|
|
return IteratorOptions{}, err
|
|
|
|
}
|
|
|
|
subOpt.Condition = cond
|
|
|
|
// If the time range is more constrained, use it instead. A less constrained time
|
|
|
|
// range should be ignored.
|
2017-11-03 16:53:23 +00:00
|
|
|
if !t.Min.IsZero() && t.MinTimeNano() > opt.StartTime {
|
|
|
|
subOpt.StartTime = t.MinTimeNano()
|
2017-09-08 16:55:53 +00:00
|
|
|
}
|
2017-11-03 16:53:23 +00:00
|
|
|
if !t.Max.IsZero() && t.MaxTimeNano() < opt.EndTime {
|
|
|
|
subOpt.EndTime = t.MaxTimeNano()
|
2017-09-08 16:55:53 +00:00
|
|
|
}
|
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
// Propagate the SLIMIT and SOFFSET from the outer query.
|
|
|
|
subOpt.SLimit += opt.SLimit
|
|
|
|
subOpt.SOffset += opt.SOffset
|
|
|
|
|
2017-08-28 20:22:31 +00:00
|
|
|
// Propagate the ordering from the parent query.
|
|
|
|
subOpt.Ascending = opt.Ascending
|
|
|
|
|
2017-09-22 14:57:32 +00:00
|
|
|
// If the inner query uses a null fill option and is not a raw query,
|
|
|
|
// switch it to none so we don't hit an unnecessary penalty from the
|
|
|
|
// fill iterator. Null values will end up getting stripped by an outer
|
|
|
|
// query anyway so there's no point in having them here. We still need
|
|
|
|
// all other types of fill iterators because they can affect the result
|
|
|
|
// of the outer query. We also do not do this for raw queries because
|
|
|
|
// there is no fill iterator for them and fill(none) doesn't work with
|
|
|
|
// raw queries.
|
|
|
|
if !stmt.IsRawQuery && subOpt.Fill == influxql.NullFill {
|
2017-08-15 19:24:22 +00:00
|
|
|
subOpt.Fill = influxql.NoFill
|
2016-11-23 20:32:42 +00:00
|
|
|
}
|
|
|
|
|
2017-04-11 15:30:06 +00:00
|
|
|
// Inherit the ordering method from the outer query.
|
|
|
|
subOpt.Ordered = opt.Ordered
|
Update subqueries so groupings are propagated to inner queries
Previously, only time expressions got propagated inwards. The reason for
this was simple. If the outer query was going to filter to a specific
time range, then it would be unnecessary for the inner query to output
points within that time frame. It started as an optimization, but became
a feature because there was no reason to have the user repeat the same
time clause for the inner query as the outer query. So we allowed an
aggregate query with an interval to pass validation in the subquery if
the outer query had a time range. But `GROUP BY` clauses were not
propagated because that same logic didn't apply to them. It's not an
optimization there. So while grouping by a tag in the outer query
without grouping by it in the inner query was useless, there wasn't any
particular reason to care.
Then a bug was found where wildcards would propagate the dimensions
correctly, but the outer query containing a group by with the inner
query omitting it wouldn't correctly filter out the outer group by. We
could fix that filtering, but on further review, I had been seeing
people make that same mistake a lot. People seem to just believe that
the grouping should be propagated inwards. Instead of trying to fight
what the user wanted and explicitly erase groupings that weren't
propagated manually, we might as well just propagate them for the user
to make their lives easier. There is no useful situation where you would
want to group into buckets that can't physically exist so we might as
well do _something_ useful.
This will also now propagate time intervals to inner queries since the
same applies there. But, while the interval propagates, the following
query will not pass validation since it is still not possible to use a
grouping interval with a raw query (even if the inner query is an
aggregate):
SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m)
This also means wildcards will behave a bit differently. They will
retrieve dimensions from the sources in the inner query rather than just
using the dimensions in the group by.
Fixing top() and bottom() to return the correct auxiliary fields.
Unfortunately, we were not copying the buffer with the auxiliary fields
so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
|
|
|
|
|
|
|
// If there is no interval for this subquery, but the outer query has an
|
|
|
|
// interval, inherit the parent interval.
|
2017-04-11 15:30:06 +00:00
|
|
|
interval, err := stmt.GroupByInterval()
|
|
|
|
if err != nil {
|
|
|
|
return IteratorOptions{}, err
|
|
|
|
} else if interval == 0 {
|
Update subqueries so groupings are propagated to inner queries
Previously, only time expressions got propagated inwards. The reason for
this was simple. If the outer query was going to filter to a specific
time range, then it would be unnecessary for the inner query to output
points within that time frame. It started as an optimization, but became
a feature because there was no reason to have the user repeat the same
time clause for the inner query as the outer query. So we allowed an
aggregate query with an interval to pass validation in the subquery if
the outer query had a time range. But `GROUP BY` clauses were not
propagated because that same logic didn't apply to them. It's not an
optimization there. So while grouping by a tag in the outer query
without grouping by it in the inner query was useless, there wasn't any
particular reason to care.
Then a bug was found where wildcards would propagate the dimensions
correctly, but the outer query containing a group by with the inner
query omitting it wouldn't correctly filter out the outer group by. We
could fix that filtering, but on further review, I had been seeing
people make that same mistake a lot. People seem to just believe that
the grouping should be propagated inwards. Instead of trying to fight
what the user wanted and explicitly erase groupings that weren't
propagated manually, we might as well just propagate them for the user
to make their lives easier. There is no useful situation where you would
want to group into buckets that can't physically exist so we might as
well do _something_ useful.
This will also now propagate time intervals to inner queries since the
same applies there. But, while the interval propagates, the following
query will not pass validation since it is still not possible to use a
grouping interval with a raw query (even if the inner query is an
aggregate):
SELECT * FROM (SELECT mean(value) FROM cpu) WHERE time > now() - 5m GROUP BY time(1m)
This also means wildcards will behave a bit differently. They will
retrieve dimensions from the sources in the inner query rather than just
using the dimensions in the group by.
Fixing top() and bottom() to return the correct auxiliary fields.
Unfortunately, we were not copying the buffer with the auxiliary fields
so those values would be overwritten by a later point.
2017-01-17 19:48:20 +00:00
|
|
|
subOpt.Interval = opt.Interval
|
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
return subOpt, nil
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// MergeSorted returns true if the options require a sorted merge.
|
|
|
|
func (opt IteratorOptions) MergeSorted() bool {
|
2017-01-10 22:44:35 +00:00
|
|
|
return opt.Ordered
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SeekTime returns the time the iterator should start from.
|
|
|
|
// For ascending iterators this is the start time, for descending iterators it's the end time.
|
|
|
|
func (opt IteratorOptions) SeekTime() int64 {
|
|
|
|
if opt.Ascending {
|
|
|
|
return opt.StartTime
|
|
|
|
}
|
|
|
|
return opt.EndTime
|
|
|
|
}
|
|
|
|
|
2018-03-13 19:15:24 +00:00
|
|
|
// StopTime returns the time the iterator should end at.
|
|
|
|
// For ascending iterators this is the end time, for descending iterators it's the start time.
|
|
|
|
func (opt IteratorOptions) StopTime() int64 {
|
|
|
|
if opt.Ascending {
|
|
|
|
return opt.EndTime
|
|
|
|
}
|
|
|
|
return opt.StartTime
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Window returns the time window [start,end) that t falls within.
|
|
|
|
func (opt IteratorOptions) Window(t int64) (start, end int64) {
|
|
|
|
if opt.Interval.IsZero() {
|
2016-04-06 17:27:08 +00:00
|
|
|
return opt.StartTime, opt.EndTime + 1
|
2015-11-04 21:06:06 +00:00
|
|
|
}
|
|
|
|
|
2016-01-22 17:48:01 +00:00
|
|
|
// Subtract the offset to the time so we calculate the correct base interval.
|
|
|
|
t -= int64(opt.Interval.Offset)
|
|
|
|
|
2016-12-23 23:14:53 +00:00
|
|
|
// Retrieve the zone offset for the start time.
|
2017-07-19 06:23:23 +00:00
|
|
|
var zone int64
|
2016-12-23 23:14:53 +00:00
|
|
|
if opt.Location != nil {
|
2017-07-19 06:23:23 +00:00
|
|
|
_, zone = opt.Zone(t)
|
2016-12-23 23:14:53 +00:00
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Truncate time by duration.
|
2017-07-19 06:23:23 +00:00
|
|
|
dt := (t + zone) % int64(opt.Interval.Duration)
|
2016-08-25 17:52:39 +00:00
|
|
|
if dt < 0 {
|
|
|
|
// Negative modulo rounds up instead of down, so offset
|
|
|
|
// with the duration.
|
|
|
|
dt += int64(opt.Interval.Duration)
|
|
|
|
}
|
2017-03-30 21:33:24 +00:00
|
|
|
|
|
|
|
// Find the start time.
|
2017-08-15 19:24:22 +00:00
|
|
|
if influxql.MinTime+dt >= t {
|
|
|
|
start = influxql.MinTime
|
2017-03-30 21:33:24 +00:00
|
|
|
} else {
|
|
|
|
start = t - dt
|
|
|
|
}
|
2015-11-04 21:06:06 +00:00
|
|
|
|
2016-12-23 23:14:53 +00:00
|
|
|
// Look for the start offset again because the first time may have been
|
|
|
|
// after the offset switch. Now that we are at midnight in UTC, we can
|
|
|
|
// lookup the zone offset again to get the real starting offset.
|
|
|
|
if opt.Location != nil {
|
2017-07-19 06:23:23 +00:00
|
|
|
_, startOffset := opt.Zone(start)
|
2016-12-23 23:14:53 +00:00
|
|
|
// Do not adjust the offset if the offset change is greater than or
|
|
|
|
// equal to the duration.
|
2017-07-19 06:23:23 +00:00
|
|
|
if o := zone - startOffset; o != 0 && abs(o) < int64(opt.Interval.Duration) {
|
|
|
|
start += o
|
2016-12-23 23:14:53 +00:00
|
|
|
}
|
|
|
|
}
|
2017-07-19 06:23:23 +00:00
|
|
|
start += int64(opt.Interval.Offset)
|
2016-12-23 23:14:53 +00:00
|
|
|
|
2017-03-30 21:33:24 +00:00
|
|
|
// Find the end time.
|
2017-08-15 19:24:22 +00:00
|
|
|
if dt := int64(opt.Interval.Duration) - dt; influxql.MaxTime-dt <= t {
|
|
|
|
end = influxql.MaxTime
|
2017-03-30 21:33:24 +00:00
|
|
|
} else {
|
|
|
|
end = t + dt
|
|
|
|
}
|
2016-12-23 23:14:53 +00:00
|
|
|
|
|
|
|
// Retrieve the zone offset for the end time.
|
|
|
|
if opt.Location != nil {
|
|
|
|
_, endOffset := opt.Zone(end)
|
|
|
|
// Adjust the end time if the offset is different from the start offset.
|
2017-07-19 06:23:23 +00:00
|
|
|
// Only apply the offset if it is smaller than the duration.
|
|
|
|
// This prevents going back in time and creating time windows
|
|
|
|
// that don't make any sense.
|
|
|
|
if o := zone - endOffset; o != 0 && abs(o) < int64(opt.Interval.Duration) {
|
|
|
|
// If the offset is greater than 0, that means we are adding time.
|
|
|
|
// Added time goes into the previous interval because the clocks
|
|
|
|
// move backwards. If the offset is less than 0, then we are skipping
|
|
|
|
// time. Skipped time comes after the switch so if we have a time
|
|
|
|
// interval that lands on the switch, it comes from the next
|
|
|
|
// interval and not the current one. For this reason, we need to know
|
|
|
|
// when the actual switch happens by seeing if the time switch is within
|
|
|
|
// the current interval. We calculate the zone offset with the offset
|
|
|
|
// and see if the value is the same. If it is, we apply the
|
|
|
|
// offset.
|
|
|
|
if o > 0 {
|
|
|
|
end += o
|
|
|
|
} else if _, z := opt.Zone(end + o); z == endOffset {
|
|
|
|
end += o
|
2016-12-23 23:14:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-07-19 06:23:23 +00:00
|
|
|
end += int64(opt.Interval.Offset)
|
2015-11-04 21:06:06 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-12-24 18:46:31 +00:00
|
|
|
// DerivativeInterval returns the time interval for the derivative function.
|
|
|
|
func (opt IteratorOptions) DerivativeInterval() Interval {
|
|
|
|
// Use the interval on the derivative() call, if specified.
|
2017-08-15 19:24:22 +00:00
|
|
|
if expr, ok := opt.Expr.(*influxql.Call); ok && len(expr.Args) == 2 {
|
|
|
|
return Interval{Duration: expr.Args[1].(*influxql.DurationLiteral).Val}
|
2015-12-24 18:46:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Otherwise use the group by interval, if specified.
|
|
|
|
if opt.Interval.Duration > 0 {
|
2016-01-22 17:48:01 +00:00
|
|
|
return Interval{Duration: opt.Interval.Duration}
|
2015-12-24 18:46:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return Interval{Duration: time.Second}
|
|
|
|
}
|
|
|
|
|
2016-04-19 16:36:41 +00:00
|
|
|
// ElapsedInterval returns the time interval for the elapsed function.
|
|
|
|
func (opt IteratorOptions) ElapsedInterval() Interval {
|
|
|
|
// Use the interval on the elapsed() call, if specified.
|
2017-08-15 19:24:22 +00:00
|
|
|
if expr, ok := opt.Expr.(*influxql.Call); ok && len(expr.Args) == 2 {
|
|
|
|
return Interval{Duration: expr.Args[1].(*influxql.DurationLiteral).Val}
|
2016-04-19 16:36:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return Interval{Duration: time.Nanosecond}
|
|
|
|
}
|
|
|
|
|
2016-11-06 12:54:26 +00:00
|
|
|
// IntegralInterval returns the time interval for the integral function.
|
|
|
|
func (opt IteratorOptions) IntegralInterval() Interval {
|
|
|
|
// Use the interval on the integral() call, if specified.
|
2017-08-15 19:24:22 +00:00
|
|
|
if expr, ok := opt.Expr.(*influxql.Call); ok && len(expr.Args) == 2 {
|
|
|
|
return Interval{Duration: expr.Args[1].(*influxql.DurationLiteral).Val}
|
2016-11-06 12:54:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return Interval{Duration: time.Second}
|
|
|
|
}
|
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
// GetDimensions retrieves the dimensions for this query.
|
|
|
|
func (opt IteratorOptions) GetDimensions() []string {
|
|
|
|
if len(opt.GroupBy) > 0 {
|
|
|
|
dimensions := make([]string, 0, len(opt.GroupBy))
|
|
|
|
for dim := range opt.GroupBy {
|
|
|
|
dimensions = append(dimensions, dim)
|
|
|
|
}
|
|
|
|
return dimensions
|
|
|
|
}
|
|
|
|
return opt.Dimensions
|
|
|
|
}
|
|
|
|
|
2016-12-23 23:14:53 +00:00
|
|
|
// Zone returns the zone information for the given time. The offset is in nanoseconds.
|
|
|
|
func (opt *IteratorOptions) Zone(ns int64) (string, int64) {
|
|
|
|
if opt.Location == nil {
|
|
|
|
return "", 0
|
|
|
|
}
|
|
|
|
|
|
|
|
t := time.Unix(0, ns).In(opt.Location)
|
|
|
|
name, offset := t.Zone()
|
|
|
|
return name, secToNs * int64(offset)
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
// MarshalBinary encodes opt into a binary format.
|
|
|
|
func (opt *IteratorOptions) MarshalBinary() ([]byte, error) {
|
|
|
|
return proto.Marshal(encodeIteratorOptions(opt))
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalBinary decodes from a binary format in to opt.
|
|
|
|
func (opt *IteratorOptions) UnmarshalBinary(buf []byte) error {
|
|
|
|
var pb internal.IteratorOptions
|
|
|
|
if err := proto.Unmarshal(buf, &pb); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
other, err := decodeIteratorOptions(&pb)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
*opt = *other
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions {
|
|
|
|
pb := &internal.IteratorOptions{
|
|
|
|
Interval: encodeInterval(opt.Interval),
|
|
|
|
Dimensions: opt.Dimensions,
|
|
|
|
Fill: proto.Int32(int32(opt.Fill)),
|
|
|
|
StartTime: proto.Int64(opt.StartTime),
|
|
|
|
EndTime: proto.Int64(opt.EndTime),
|
|
|
|
Ascending: proto.Bool(opt.Ascending),
|
|
|
|
Limit: proto.Int64(int64(opt.Limit)),
|
|
|
|
Offset: proto.Int64(int64(opt.Offset)),
|
|
|
|
SLimit: proto.Int64(int64(opt.SLimit)),
|
|
|
|
SOffset: proto.Int64(int64(opt.SOffset)),
|
2017-08-24 14:45:46 +00:00
|
|
|
StripName: proto.Bool(opt.StripName),
|
2016-02-19 20:38:02 +00:00
|
|
|
Dedupe: proto.Bool(opt.Dedupe),
|
2016-11-23 20:32:42 +00:00
|
|
|
MaxSeriesN: proto.Int64(int64(opt.MaxSeriesN)),
|
|
|
|
Ordered: proto.Bool(opt.Ordered),
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Set expression, if set.
|
|
|
|
if opt.Expr != nil {
|
|
|
|
pb.Expr = proto.String(opt.Expr.String())
|
|
|
|
}
|
|
|
|
|
2016-12-23 23:14:53 +00:00
|
|
|
// Set the location, if set.
|
|
|
|
if opt.Location != nil {
|
|
|
|
pb.Location = proto.String(opt.Location.String())
|
|
|
|
}
|
|
|
|
|
2016-05-16 16:08:28 +00:00
|
|
|
// Convert and encode aux fields as variable references.
|
2017-09-14 14:06:11 +00:00
|
|
|
if opt.Aux != nil {
|
|
|
|
pb.Fields = make([]*internal.VarRef, len(opt.Aux))
|
|
|
|
pb.Aux = make([]string, len(opt.Aux))
|
|
|
|
for i, ref := range opt.Aux {
|
|
|
|
pb.Fields[i] = encodeVarRef(ref)
|
|
|
|
pb.Aux[i] = ref.Val
|
|
|
|
}
|
2016-05-16 16:08:28 +00:00
|
|
|
}
|
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
// Encode group by dimensions from a map.
|
2017-09-14 14:06:11 +00:00
|
|
|
if opt.GroupBy != nil {
|
2016-11-23 20:32:42 +00:00
|
|
|
dimensions := make([]string, 0, len(opt.GroupBy))
|
|
|
|
for dim := range opt.GroupBy {
|
|
|
|
dimensions = append(dimensions, dim)
|
|
|
|
}
|
|
|
|
pb.GroupBy = dimensions
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
// Convert and encode sources to measurements.
|
2016-11-23 20:32:42 +00:00
|
|
|
if opt.Sources != nil {
|
|
|
|
sources := make([]*internal.Measurement, len(opt.Sources))
|
|
|
|
for i, source := range opt.Sources {
|
2017-08-15 19:24:22 +00:00
|
|
|
mm := source.(*influxql.Measurement)
|
2016-11-23 20:32:42 +00:00
|
|
|
sources[i] = encodeMeasurement(mm)
|
|
|
|
}
|
|
|
|
pb.Sources = sources
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Fill value can only be a number. Set it if available.
|
|
|
|
if v, ok := opt.FillValue.(float64); ok {
|
|
|
|
pb.FillValue = proto.Float64(v)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set condition, if set.
|
|
|
|
if opt.Condition != nil {
|
|
|
|
pb.Condition = proto.String(opt.Condition.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
return pb
|
|
|
|
}
|
|
|
|
|
|
|
|
func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, error) {
|
|
|
|
opt := &IteratorOptions{
|
|
|
|
Interval: decodeInterval(pb.GetInterval()),
|
|
|
|
Dimensions: pb.GetDimensions(),
|
2017-08-15 19:24:22 +00:00
|
|
|
Fill: influxql.FillOption(pb.GetFill()),
|
2016-02-19 20:38:02 +00:00
|
|
|
StartTime: pb.GetStartTime(),
|
|
|
|
EndTime: pb.GetEndTime(),
|
|
|
|
Ascending: pb.GetAscending(),
|
|
|
|
Limit: int(pb.GetLimit()),
|
|
|
|
Offset: int(pb.GetOffset()),
|
|
|
|
SLimit: int(pb.GetSLimit()),
|
|
|
|
SOffset: int(pb.GetSOffset()),
|
2017-08-24 14:45:46 +00:00
|
|
|
StripName: pb.GetStripName(),
|
2016-02-19 20:38:02 +00:00
|
|
|
Dedupe: pb.GetDedupe(),
|
2016-11-23 20:32:42 +00:00
|
|
|
MaxSeriesN: int(pb.GetMaxSeriesN()),
|
|
|
|
Ordered: pb.GetOrdered(),
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Set expression, if set.
|
|
|
|
if pb.Expr != nil {
|
2017-08-15 19:24:22 +00:00
|
|
|
expr, err := influxql.ParseExpr(pb.GetExpr())
|
2016-02-19 20:38:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
opt.Expr = expr
|
|
|
|
}
|
|
|
|
|
2016-12-23 23:14:53 +00:00
|
|
|
if pb.Location != nil {
|
|
|
|
loc, err := time.LoadLocation(pb.GetLocation())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
opt.Location = loc
|
|
|
|
}
|
|
|
|
|
2016-05-16 16:08:28 +00:00
|
|
|
// Convert and decode variable references.
|
|
|
|
if fields := pb.GetFields(); fields != nil {
|
2017-08-15 19:24:22 +00:00
|
|
|
opt.Aux = make([]influxql.VarRef, len(fields))
|
2016-05-16 16:08:28 +00:00
|
|
|
for i, ref := range fields {
|
|
|
|
opt.Aux[i] = decodeVarRef(ref)
|
|
|
|
}
|
2017-09-14 14:06:11 +00:00
|
|
|
} else if aux := pb.GetAux(); aux != nil {
|
|
|
|
opt.Aux = make([]influxql.VarRef, len(aux))
|
|
|
|
for i, name := range aux {
|
2017-08-15 19:24:22 +00:00
|
|
|
opt.Aux[i] = influxql.VarRef{Val: name}
|
2016-05-16 16:08:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-23 20:32:42 +00:00
|
|
|
// Convert and decode sources to measurements.
|
|
|
|
if pb.Sources != nil {
|
2017-08-15 19:24:22 +00:00
|
|
|
sources := make([]influxql.Source, len(pb.GetSources()))
|
2016-11-23 20:32:42 +00:00
|
|
|
for i, source := range pb.GetSources() {
|
|
|
|
mm, err := decodeMeasurement(source)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
sources[i] = mm
|
|
|
|
}
|
|
|
|
opt.Sources = sources
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert group by dimensions to a map.
|
|
|
|
if pb.GroupBy != nil {
|
|
|
|
dimensions := make(map[string]struct{}, len(pb.GroupBy))
|
|
|
|
for _, dim := range pb.GetGroupBy() {
|
|
|
|
dimensions[dim] = struct{}{}
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
2016-11-23 20:32:42 +00:00
|
|
|
opt.GroupBy = dimensions
|
2016-02-19 20:38:02 +00:00
|
|
|
}
|
|
|
|
|
2017-09-14 14:06:11 +00:00
|
|
|
// Set the fill value, if set.
|
|
|
|
if pb.FillValue != nil {
|
|
|
|
opt.FillValue = pb.GetFillValue()
|
|
|
|
}
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
// Set condition, if set.
|
|
|
|
if pb.Condition != nil {
|
2017-08-15 19:24:22 +00:00
|
|
|
expr, err := influxql.ParseExpr(pb.GetCondition())
|
2016-02-19 20:38:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
opt.Condition = expr
|
|
|
|
}
|
|
|
|
|
|
|
|
return opt, nil
|
|
|
|
}
|
|
|
|
|
2017-08-15 19:24:22 +00:00
|
|
|
func encodeMeasurement(mm *influxql.Measurement) *internal.Measurement {
|
|
|
|
pb := &internal.Measurement{
|
|
|
|
Database: proto.String(mm.Database),
|
|
|
|
RetentionPolicy: proto.String(mm.RetentionPolicy),
|
|
|
|
Name: proto.String(mm.Name),
|
2017-11-08 19:57:16 +00:00
|
|
|
SystemIterator: proto.String(mm.SystemIterator),
|
2017-08-15 19:24:22 +00:00
|
|
|
IsTarget: proto.Bool(mm.IsTarget),
|
|
|
|
}
|
|
|
|
if mm.Regex != nil {
|
|
|
|
pb.Regex = proto.String(mm.Regex.Val.String())
|
|
|
|
}
|
|
|
|
return pb
|
|
|
|
}
|
|
|
|
|
|
|
|
func decodeMeasurement(pb *internal.Measurement) (*influxql.Measurement, error) {
|
|
|
|
mm := &influxql.Measurement{
|
|
|
|
Database: pb.GetDatabase(),
|
|
|
|
RetentionPolicy: pb.GetRetentionPolicy(),
|
|
|
|
Name: pb.GetName(),
|
2017-11-08 19:57:16 +00:00
|
|
|
SystemIterator: pb.GetSystemIterator(),
|
2017-08-15 19:24:22 +00:00
|
|
|
IsTarget: pb.GetIsTarget(),
|
|
|
|
}
|
|
|
|
|
|
|
|
if pb.Regex != nil {
|
|
|
|
regex, err := regexp.Compile(pb.GetRegex())
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("invalid binary measurement regex: value=%q, err=%s", pb.GetRegex(), err)
|
|
|
|
}
|
|
|
|
mm.Regex = &influxql.RegexLiteral{Val: regex}
|
|
|
|
}
|
|
|
|
|
|
|
|
return mm, nil
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
// Interval represents a repeating interval for a query.
|
|
|
|
type Interval struct {
|
|
|
|
Duration time.Duration
|
|
|
|
Offset time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
// IsZero returns true if the interval has no duration.
|
|
|
|
func (i Interval) IsZero() bool { return i.Duration == 0 }
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
func encodeInterval(i Interval) *internal.Interval {
|
|
|
|
return &internal.Interval{
|
|
|
|
Duration: proto.Int64(i.Duration.Nanoseconds()),
|
|
|
|
Offset: proto.Int64(i.Offset.Nanoseconds()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func decodeInterval(pb *internal.Interval) Interval {
|
|
|
|
return Interval{
|
|
|
|
Duration: time.Duration(pb.GetDuration()),
|
|
|
|
Offset: time.Duration(pb.GetOffset()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-15 19:24:22 +00:00
|
|
|
func encodeVarRef(ref influxql.VarRef) *internal.VarRef {
|
2016-05-16 16:08:28 +00:00
|
|
|
return &internal.VarRef{
|
|
|
|
Val: proto.String(ref.Val),
|
|
|
|
Type: proto.Int32(int32(ref.Type)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-15 19:24:22 +00:00
|
|
|
func decodeVarRef(pb *internal.VarRef) influxql.VarRef {
|
|
|
|
return influxql.VarRef{
|
2016-05-16 16:08:28 +00:00
|
|
|
Val: pb.GetVal(),
|
2017-08-15 19:24:22 +00:00
|
|
|
Type: influxql.DataType(pb.GetType()),
|
2016-05-16 16:08:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-11-04 21:06:06 +00:00
|
|
|
type nilFloatIterator struct{}
|
|
|
|
|
2016-04-17 20:00:59 +00:00
|
|
|
func (*nilFloatIterator) Stats() IteratorStats { return IteratorStats{} }
|
|
|
|
func (*nilFloatIterator) Close() error { return nil }
|
|
|
|
func (*nilFloatIterator) Next() (*FloatPoint, error) { return nil, nil }
|
2016-02-03 03:12:32 +00:00
|
|
|
|
2017-08-09 21:46:44 +00:00
|
|
|
type nilFloatReaderIterator struct {
|
|
|
|
r io.Reader
|
|
|
|
}
|
|
|
|
|
|
|
|
func (*nilFloatReaderIterator) Stats() IteratorStats { return IteratorStats{} }
|
|
|
|
func (itr *nilFloatReaderIterator) Close() error {
|
|
|
|
if r, ok := itr.r.(io.ReadCloser); ok {
|
2017-08-09 21:56:24 +00:00
|
|
|
itr.r = nil
|
2017-08-09 21:46:44 +00:00
|
|
|
return r.Close()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func (*nilFloatReaderIterator) Next() (*FloatPoint, error) { return nil, nil }
|
|
|
|
|
2016-03-17 15:55:37 +00:00
|
|
|
// IteratorStats represents statistics about an iterator.
|
|
|
|
// Some statistics are available immediately upon iterator creation while
|
|
|
|
// some are derived as the iterator processes data.
|
|
|
|
type IteratorStats struct {
|
|
|
|
SeriesN int // series represented
|
|
|
|
PointN int // points returned
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add aggregates fields from s and other together. Overwrites s.
|
|
|
|
func (s *IteratorStats) Add(other IteratorStats) {
|
|
|
|
s.SeriesN += other.SeriesN
|
|
|
|
s.PointN += other.PointN
|
|
|
|
}
|
|
|
|
|
|
|
|
func encodeIteratorStats(stats *IteratorStats) *internal.IteratorStats {
|
|
|
|
return &internal.IteratorStats{
|
|
|
|
SeriesN: proto.Int64(int64(stats.SeriesN)),
|
|
|
|
PointN: proto.Int64(int64(stats.PointN)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func decodeIteratorStats(pb *internal.IteratorStats) IteratorStats {
|
|
|
|
return IteratorStats{
|
|
|
|
SeriesN: int(pb.GetSeriesN()),
|
|
|
|
PointN: int(pb.GetPointN()),
|
|
|
|
}
|
|
|
|
}
|
2016-04-04 16:53:39 +00:00
|
|
|
|
2017-10-11 14:08:31 +00:00
|
|
|
func decodeIteratorTrace(ctx context.Context, data []byte) error {
|
|
|
|
pt := tracing.TraceFromContext(ctx)
|
|
|
|
if pt == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var ct tracing.Trace
|
|
|
|
if err := ct.UnmarshalBinary(data); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
pt.Merge(&ct)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-08-24 16:27:29 +00:00
|
|
|
// IteratorCost contains statistics retrieved for explaining what potential
|
|
|
|
// cost may be incurred by instantiating an iterator.
|
|
|
|
type IteratorCost struct {
|
|
|
|
// The total number of shards that are touched by this query.
|
|
|
|
NumShards int64
|
|
|
|
|
|
|
|
// The total number of non-unique series that are accessed by this query.
|
|
|
|
// This number matches the number of cursors created by the query since
|
|
|
|
// one cursor is created for every series.
|
|
|
|
NumSeries int64
|
|
|
|
|
2017-09-05 19:15:20 +00:00
|
|
|
// CachedValues returns the number of cached values that may be read by this
|
|
|
|
// query.
|
|
|
|
CachedValues int64
|
|
|
|
|
2017-08-24 16:27:29 +00:00
|
|
|
// The total number of non-unique files that may be accessed by this query.
|
|
|
|
// This will count the number of files accessed by each series so files
|
|
|
|
// will likely be double counted.
|
|
|
|
NumFiles int64
|
|
|
|
|
|
|
|
// The number of blocks that had the potential to be accessed.
|
|
|
|
BlocksRead int64
|
|
|
|
|
|
|
|
// The amount of data that can be potentially read.
|
|
|
|
BlockSize int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// Combine combines the results of two IteratorCost structures into one.
|
|
|
|
func (c IteratorCost) Combine(other IteratorCost) IteratorCost {
|
|
|
|
return IteratorCost{
|
2017-09-05 19:15:20 +00:00
|
|
|
NumShards: c.NumShards + other.NumShards,
|
|
|
|
NumSeries: c.NumSeries + other.NumSeries,
|
|
|
|
CachedValues: c.CachedValues + other.CachedValues,
|
|
|
|
NumFiles: c.NumFiles + other.NumFiles,
|
|
|
|
BlocksRead: c.BlocksRead + other.BlocksRead,
|
|
|
|
BlockSize: c.BlockSize + other.BlockSize,
|
2017-08-24 16:27:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-02 15:17:08 +00:00
|
|
|
// floatFastDedupeIterator outputs unique points where the point has a single aux field.
|
|
|
|
type floatFastDedupeIterator struct {
|
|
|
|
input FloatIterator
|
|
|
|
m map[fastDedupeKey]struct{} // lookup of points already sent
|
|
|
|
}
|
|
|
|
|
|
|
|
// newFloatFastDedupeIterator returns a new instance of floatFastDedupeIterator.
|
|
|
|
func newFloatFastDedupeIterator(input FloatIterator) *floatFastDedupeIterator {
|
|
|
|
return &floatFastDedupeIterator{
|
|
|
|
input: input,
|
|
|
|
m: make(map[fastDedupeKey]struct{}),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stats returns stats from the input iterator.
|
|
|
|
func (itr *floatFastDedupeIterator) Stats() IteratorStats { return itr.input.Stats() }
|
|
|
|
|
|
|
|
// Close closes the iterator and all child iterators.
|
|
|
|
func (itr *floatFastDedupeIterator) Close() error { return itr.input.Close() }
|
|
|
|
|
|
|
|
// Next returns the next unique point from the input iterator.
|
|
|
|
func (itr *floatFastDedupeIterator) Next() (*FloatPoint, error) {
|
|
|
|
for {
|
|
|
|
// Read next point.
|
|
|
|
// Skip if there are not any aux fields.
|
|
|
|
p, err := itr.input.Next()
|
|
|
|
if p == nil || err != nil {
|
|
|
|
return nil, err
|
|
|
|
} else if len(p.Aux) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the point has already been output then move to the next point.
|
2016-06-03 03:03:57 +00:00
|
|
|
key := fastDedupeKey{name: p.Name}
|
|
|
|
key.values[0] = p.Aux[0]
|
|
|
|
if len(p.Aux) > 1 {
|
|
|
|
key.values[1] = p.Aux[1]
|
|
|
|
}
|
2016-05-02 15:17:08 +00:00
|
|
|
if _, ok := itr.m[key]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Otherwise mark it as emitted and return point.
|
|
|
|
itr.m[key] = struct{}{}
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type fastDedupeKey struct {
|
2016-06-03 03:03:57 +00:00
|
|
|
name string
|
|
|
|
values [2]interface{}
|
2016-05-02 15:17:08 +00:00
|
|
|
}
|
|
|
|
|
2016-04-04 16:53:39 +00:00
|
|
|
type reverseStringSlice []string
|
|
|
|
|
|
|
|
func (p reverseStringSlice) Len() int { return len(p) }
|
|
|
|
func (p reverseStringSlice) Less(i, j int) bool { return p[i] > p[j] }
|
|
|
|
func (p reverseStringSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
2016-12-23 23:14:53 +00:00
|
|
|
|
|
|
|
func abs(v int64) int64 {
|
2018-09-11 20:21:36 +00:00
|
|
|
sign := v >> 63
|
|
|
|
return (v ^ sign) - sign
|
2016-12-23 23:14:53 +00:00
|
|
|
}
|
2017-10-11 14:08:31 +00:00
|
|
|
|
|
|
|
// IteratorEncoder is an encoder for encoding an iterator's points to w.
|
|
|
|
type IteratorEncoder struct {
|
|
|
|
w io.Writer
|
|
|
|
|
|
|
|
// Frequency with which stats are emitted.
|
|
|
|
StatsInterval time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewIteratorEncoder encodes an iterator's points to w.
|
|
|
|
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
|
|
|
|
return &IteratorEncoder{
|
|
|
|
w: w,
|
|
|
|
|
|
|
|
StatsInterval: DefaultStatsInterval,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// EncodeIterator encodes and writes all of itr's points to the underlying writer.
|
|
|
|
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
|
|
|
|
switch itr := itr.(type) {
|
|
|
|
case FloatIterator:
|
|
|
|
return enc.encodeFloatIterator(itr)
|
|
|
|
case IntegerIterator:
|
|
|
|
return enc.encodeIntegerIterator(itr)
|
|
|
|
case StringIterator:
|
|
|
|
return enc.encodeStringIterator(itr)
|
|
|
|
case BooleanIterator:
|
|
|
|
return enc.encodeBooleanIterator(itr)
|
|
|
|
default:
|
|
|
|
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (enc *IteratorEncoder) EncodeTrace(trace *tracing.Trace) error {
|
|
|
|
data, err := trace.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
buf, err := proto.Marshal(&internal.Point{
|
|
|
|
Name: proto.String(""),
|
|
|
|
Tags: proto.String(""),
|
|
|
|
Time: proto.Int64(0),
|
|
|
|
Nil: proto.Bool(false),
|
|
|
|
|
|
|
|
Trace: data,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err = enc.w.Write(buf); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// encode a stats object in the point stream.
|
|
|
|
func (enc *IteratorEncoder) encodeStats(stats IteratorStats) error {
|
|
|
|
buf, err := proto.Marshal(&internal.Point{
|
|
|
|
Name: proto.String(""),
|
|
|
|
Tags: proto.String(""),
|
|
|
|
Time: proto.Int64(0),
|
|
|
|
Nil: proto.Bool(false),
|
|
|
|
|
|
|
|
Stats: encodeIteratorStats(&stats),
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err = enc.w.Write(buf); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|