448 lines
11 KiB
Go
448 lines
11 KiB
Go
package query
|
|
|
|
import (
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxql"
|
|
)
|
|
|
|
var NullFloat interface{} = (*float64)(nil)
|
|
|
|
// Series represents the metadata about a series.
|
|
type Series struct {
|
|
// Name is the measurement name.
|
|
Name string
|
|
|
|
// Tags for the series.
|
|
Tags Tags
|
|
|
|
// This is an internal id used to easily compare if a series is the
|
|
// same as another series. Whenever the internal cursor changes
|
|
// to a new series, this id gets incremented. It is not exposed to
|
|
// the user so we can implement this in whatever way we want.
|
|
// If a series is not generated by a cursor, this id is zero and
|
|
// it will instead attempt to compare the name and tags.
|
|
id uint64
|
|
}
|
|
|
|
// SameSeries checks if this is the same series as another one.
|
|
// It does not necessarily check for equality so this is different from
|
|
// checking to see if the name and tags are the same. It checks whether
|
|
// the two are part of the same series in the response.
|
|
func (s Series) SameSeries(other Series) bool {
|
|
if s.id != 0 && other.id != 0 {
|
|
return s.id == other.id
|
|
}
|
|
return s.Name == other.Name && s.Tags.ID() == other.Tags.ID()
|
|
}
|
|
|
|
// Equal checks to see if the Series are identical.
|
|
func (s Series) Equal(other Series) bool {
|
|
if s.id != 0 && other.id != 0 {
|
|
// If the ids are the same, then we can short-circuit and assume they
|
|
// are the same. If they are not the same, do the long check since
|
|
// they may still be identical, but not necessarily generated from
|
|
// the same cursor.
|
|
if s.id == other.id {
|
|
return true
|
|
}
|
|
}
|
|
return s.Name == other.Name && s.Tags.ID() == other.Tags.ID()
|
|
}
|
|
|
|
// Row represents a single row returned by the query engine.
|
|
type Row struct {
|
|
// Time returns the time for this row. If the cursor was created to
|
|
// return time as one of the values, the time will also be included as
|
|
// a time.Time in the appropriate column within Values.
|
|
// This ensures that time is always present in the Row structure
|
|
// even if it hasn't been requested in the output.
|
|
Time int64
|
|
|
|
// Series contains the series metadata for this row.
|
|
Series Series
|
|
|
|
// Values contains the values within the current row.
|
|
Values []interface{}
|
|
}
|
|
|
|
type Cursor interface {
|
|
// Scan will retrieve the next row and assign the result to
|
|
// the passed in Row. If the Row has not been initialized, the Cursor
|
|
// will initialize the Row.
|
|
// To increase speed and memory usage, the same Row can be used and
|
|
// the previous values will be overwritten while using the same memory.
|
|
Scan(row *Row) bool
|
|
|
|
// Stats returns the IteratorStats from the underlying iterators.
|
|
Stats() IteratorStats
|
|
|
|
// Err returns any errors that were encountered from scanning the rows.
|
|
Err() error
|
|
|
|
// Columns returns the column names and types.
|
|
Columns() []influxql.VarRef
|
|
|
|
// Close closes the underlying resources that the cursor is using.
|
|
Close() error
|
|
}
|
|
|
|
// RowCursor returns a Cursor that iterates over Rows.
|
|
func RowCursor(rows []Row, columns []influxql.VarRef) Cursor {
|
|
return &rowCursor{
|
|
rows: rows,
|
|
columns: columns,
|
|
}
|
|
}
|
|
|
|
type rowCursor struct {
|
|
rows []Row
|
|
columns []influxql.VarRef
|
|
|
|
series Series
|
|
}
|
|
|
|
func (cur *rowCursor) Scan(row *Row) bool {
|
|
if len(cur.rows) == 0 {
|
|
return false
|
|
}
|
|
|
|
*row = cur.rows[0]
|
|
if row.Series.Name != cur.series.Name || !row.Series.Tags.Equals(&cur.series.Tags) {
|
|
cur.series.Name = row.Series.Name
|
|
cur.series.Tags = row.Series.Tags
|
|
cur.series.id++
|
|
}
|
|
cur.rows = cur.rows[1:]
|
|
return true
|
|
}
|
|
|
|
func (cur *rowCursor) Stats() IteratorStats {
|
|
return IteratorStats{}
|
|
}
|
|
|
|
func (cur *rowCursor) Err() error {
|
|
return nil
|
|
}
|
|
|
|
func (cur *rowCursor) Columns() []influxql.VarRef {
|
|
return cur.columns
|
|
}
|
|
|
|
func (cur *rowCursor) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type scannerFunc func(m map[string]interface{}) (int64, string, Tags)
|
|
|
|
type scannerCursorBase struct {
|
|
fields []influxql.Expr
|
|
m map[string]interface{}
|
|
|
|
series Series
|
|
columns []influxql.VarRef
|
|
loc *time.Location
|
|
|
|
scan scannerFunc
|
|
valuer influxql.ValuerEval
|
|
}
|
|
|
|
func newScannerCursorBase(scan scannerFunc, fields []*influxql.Field, loc *time.Location) scannerCursorBase {
|
|
typmap := FunctionTypeMapper{}
|
|
exprs := make([]influxql.Expr, len(fields))
|
|
columns := make([]influxql.VarRef, len(fields))
|
|
for i, f := range fields {
|
|
exprs[i] = f.Expr
|
|
columns[i] = influxql.VarRef{
|
|
Val: f.Name(),
|
|
Type: influxql.EvalType(f.Expr, nil, typmap),
|
|
}
|
|
}
|
|
if loc == nil {
|
|
loc = time.UTC
|
|
}
|
|
|
|
m := make(map[string]interface{})
|
|
return scannerCursorBase{
|
|
fields: exprs,
|
|
m: m,
|
|
columns: columns,
|
|
loc: loc,
|
|
scan: scan,
|
|
valuer: influxql.ValuerEval{
|
|
Valuer: influxql.MultiValuer(
|
|
MathValuer{},
|
|
influxql.MapValuer(m),
|
|
),
|
|
IntegerFloatDivision: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (cur *scannerCursorBase) Scan(row *Row) bool {
|
|
ts, name, tags := cur.scan(cur.m)
|
|
if ts == ZeroTime {
|
|
return false
|
|
}
|
|
|
|
row.Time = ts
|
|
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
|
|
cur.series.Name = name
|
|
cur.series.Tags = tags
|
|
cur.series.id++
|
|
}
|
|
row.Series = cur.series
|
|
|
|
if len(cur.columns) > len(row.Values) {
|
|
row.Values = make([]interface{}, len(cur.columns))
|
|
}
|
|
|
|
for i, expr := range cur.fields {
|
|
// A special case if the field is time to reduce memory allocations.
|
|
if ref, ok := expr.(*influxql.VarRef); ok && ref.Val == "time" {
|
|
row.Values[i] = time.Unix(0, row.Time).In(cur.loc)
|
|
continue
|
|
}
|
|
v := cur.valuer.Eval(expr)
|
|
if fv, ok := v.(float64); ok && math.IsNaN(fv) {
|
|
// If the float value is NaN, convert it to a null float
|
|
// so this can be serialized correctly, but not mistaken for
|
|
// a null value that needs to be filled.
|
|
v = NullFloat
|
|
}
|
|
row.Values[i] = v
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (cur *scannerCursorBase) Columns() []influxql.VarRef {
|
|
return cur.columns
|
|
}
|
|
|
|
func (cur *scannerCursorBase) clear(m map[string]interface{}) {
|
|
for k := range m {
|
|
delete(m, k)
|
|
}
|
|
}
|
|
|
|
var _ Cursor = (*scannerCursor)(nil)
|
|
|
|
type scannerCursor struct {
|
|
scanner IteratorScanner
|
|
scannerCursorBase
|
|
}
|
|
|
|
func newScannerCursor(s IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *scannerCursor {
|
|
cur := &scannerCursor{scanner: s}
|
|
cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location)
|
|
return cur
|
|
}
|
|
|
|
func (s *scannerCursor) scan(m map[string]interface{}) (int64, string, Tags) {
|
|
ts, name, tags := s.scanner.Peek()
|
|
// if a new series, clear the map of previous values
|
|
if name != s.series.Name || tags.ID() != s.series.Tags.ID() {
|
|
s.clear(m)
|
|
}
|
|
if ts == ZeroTime {
|
|
return ts, name, tags
|
|
}
|
|
s.scanner.ScanAt(ts, name, tags, m)
|
|
return ts, name, tags
|
|
}
|
|
|
|
func (cur *scannerCursor) Stats() IteratorStats {
|
|
return cur.scanner.Stats()
|
|
}
|
|
|
|
func (cur *scannerCursor) Err() error {
|
|
return cur.scanner.Err()
|
|
}
|
|
|
|
func (cur *scannerCursor) Close() error {
|
|
return cur.scanner.Close()
|
|
}
|
|
|
|
var _ Cursor = (*multiScannerCursor)(nil)
|
|
|
|
type multiScannerCursor struct {
|
|
scanners []IteratorScanner
|
|
err error
|
|
ascending bool
|
|
scannerCursorBase
|
|
}
|
|
|
|
func newMultiScannerCursor(scanners []IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *multiScannerCursor {
|
|
cur := &multiScannerCursor{
|
|
scanners: scanners,
|
|
ascending: opt.Ascending,
|
|
}
|
|
cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location)
|
|
return cur
|
|
}
|
|
|
|
func (cur *multiScannerCursor) scan(m map[string]interface{}) (ts int64, name string, tags Tags) {
|
|
ts = ZeroTime
|
|
for _, s := range cur.scanners {
|
|
curTime, curName, curTags := s.Peek()
|
|
if curTime == ZeroTime {
|
|
if err := s.Err(); err != nil {
|
|
cur.err = err
|
|
return ZeroTime, "", Tags{}
|
|
}
|
|
continue
|
|
}
|
|
|
|
if ts == ZeroTime {
|
|
ts, name, tags = curTime, curName, curTags
|
|
continue
|
|
}
|
|
|
|
if cur.ascending {
|
|
if (curName < name) || (curName == name && curTags.ID() < tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime < ts) {
|
|
ts, name, tags = curTime, curName, curTags
|
|
}
|
|
continue
|
|
}
|
|
|
|
if (curName > name) || (curName == name && curTags.ID() > tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime > ts) {
|
|
ts, name, tags = curTime, curName, curTags
|
|
}
|
|
}
|
|
|
|
if ts == ZeroTime {
|
|
return ts, name, tags
|
|
}
|
|
// if a new series, clear the map of previous values
|
|
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
|
|
cur.clear(m)
|
|
}
|
|
for _, s := range cur.scanners {
|
|
s.ScanAt(ts, name, tags, m)
|
|
}
|
|
return ts, name, tags
|
|
}
|
|
|
|
func (cur *multiScannerCursor) Stats() IteratorStats {
|
|
var stats IteratorStats
|
|
for _, s := range cur.scanners {
|
|
stats.Add(s.Stats())
|
|
}
|
|
return stats
|
|
}
|
|
|
|
func (cur *multiScannerCursor) Err() error {
|
|
return cur.err
|
|
}
|
|
|
|
func (cur *multiScannerCursor) Close() error {
|
|
var err error
|
|
for _, s := range cur.scanners {
|
|
if e := s.Close(); e != nil && err == nil {
|
|
err = e
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
type filterCursor struct {
|
|
Cursor
|
|
// fields holds the mapping of field names to the index in the row
|
|
// based off of the column metadata. This only contains the fields
|
|
// we need and will exclude the ones we do not.
|
|
fields map[string]IteratorMap
|
|
filter influxql.Expr
|
|
m map[string]interface{}
|
|
valuer influxql.ValuerEval
|
|
}
|
|
|
|
func newFilterCursor(cur Cursor, filter influxql.Expr) *filterCursor {
|
|
fields := make(map[string]IteratorMap)
|
|
for _, name := range influxql.ExprNames(filter) {
|
|
for i, col := range cur.Columns() {
|
|
if name.Val == col.Val {
|
|
fields[name.Val] = FieldMap{
|
|
Index: i,
|
|
Type: name.Type,
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// If the field is not a column, assume it is a tag value.
|
|
// We do not know what the tag values will be, but there really
|
|
// isn't any different between NullMap and a TagMap that's pointed
|
|
// at the wrong location for the purposes described here.
|
|
if _, ok := fields[name.Val]; !ok {
|
|
fields[name.Val] = TagMap(name.Val)
|
|
}
|
|
}
|
|
m := make(map[string]interface{})
|
|
return &filterCursor{
|
|
Cursor: cur,
|
|
fields: fields,
|
|
filter: filter,
|
|
m: m,
|
|
valuer: influxql.ValuerEval{Valuer: influxql.MapValuer(m)},
|
|
}
|
|
}
|
|
|
|
func (cur *filterCursor) Scan(row *Row) bool {
|
|
for cur.Cursor.Scan(row) {
|
|
// Use the field mappings to prepare the map for the valuer.
|
|
for name, f := range cur.fields {
|
|
cur.m[name] = f.Value(row)
|
|
}
|
|
|
|
if cur.valuer.EvalBool(cur.filter) {
|
|
// Passes the filter! Return true. We no longer need to
|
|
// search for a suitable value.
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type nullCursor struct {
|
|
columns []influxql.VarRef
|
|
}
|
|
|
|
func newNullCursor(fields []*influxql.Field) *nullCursor {
|
|
columns := make([]influxql.VarRef, len(fields))
|
|
for i, f := range fields {
|
|
columns[i].Val = f.Name()
|
|
}
|
|
return &nullCursor{columns: columns}
|
|
}
|
|
|
|
func (cur *nullCursor) Scan(row *Row) bool {
|
|
return false
|
|
}
|
|
|
|
func (cur *nullCursor) Stats() IteratorStats {
|
|
return IteratorStats{}
|
|
}
|
|
|
|
func (cur *nullCursor) Err() error {
|
|
return nil
|
|
}
|
|
|
|
func (cur *nullCursor) Columns() []influxql.VarRef {
|
|
return cur.columns
|
|
}
|
|
|
|
func (cur *nullCursor) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// DrainCursor will read and discard all values from a Cursor and return the error
|
|
// if one happens.
|
|
func DrainCursor(cur Cursor) error {
|
|
var row Row
|
|
for cur.Scan(&row) {
|
|
// Do nothing with the result.
|
|
}
|
|
return cur.Err()
|
|
}
|