influxdb/query/cursor.go

448 lines
11 KiB
Go

package query
import (
"math"
"time"
"github.com/influxdata/influxql"
)
var NullFloat interface{} = (*float64)(nil)
// Series represents the metadata about a series.
type Series struct {
// Name is the measurement name.
Name string
// Tags for the series.
Tags Tags
// This is an internal id used to easily compare if a series is the
// same as another series. Whenever the internal cursor changes
// to a new series, this id gets incremented. It is not exposed to
// the user so we can implement this in whatever way we want.
// If a series is not generated by a cursor, this id is zero and
// it will instead attempt to compare the name and tags.
id uint64
}
// SameSeries checks if this is the same series as another one.
// It does not necessarily check for equality so this is different from
// checking to see if the name and tags are the same. It checks whether
// the two are part of the same series in the response.
func (s Series) SameSeries(other Series) bool {
if s.id != 0 && other.id != 0 {
return s.id == other.id
}
return s.Name == other.Name && s.Tags.ID() == other.Tags.ID()
}
// Equal checks to see if the Series are identical.
func (s Series) Equal(other Series) bool {
if s.id != 0 && other.id != 0 {
// If the ids are the same, then we can short-circuit and assume they
// are the same. If they are not the same, do the long check since
// they may still be identical, but not necessarily generated from
// the same cursor.
if s.id == other.id {
return true
}
}
return s.Name == other.Name && s.Tags.ID() == other.Tags.ID()
}
// Row represents a single row returned by the query engine.
type Row struct {
// Time returns the time for this row. If the cursor was created to
// return time as one of the values, the time will also be included as
// a time.Time in the appropriate column within Values.
// This ensures that time is always present in the Row structure
// even if it hasn't been requested in the output.
Time int64
// Series contains the series metadata for this row.
Series Series
// Values contains the values within the current row.
Values []interface{}
}
type Cursor interface {
// Scan will retrieve the next row and assign the result to
// the passed in Row. If the Row has not been initialized, the Cursor
// will initialize the Row.
// To increase speed and memory usage, the same Row can be used and
// the previous values will be overwritten while using the same memory.
Scan(row *Row) bool
// Stats returns the IteratorStats from the underlying iterators.
Stats() IteratorStats
// Err returns any errors that were encountered from scanning the rows.
Err() error
// Columns returns the column names and types.
Columns() []influxql.VarRef
// Close closes the underlying resources that the cursor is using.
Close() error
}
// RowCursor returns a Cursor that iterates over Rows.
func RowCursor(rows []Row, columns []influxql.VarRef) Cursor {
return &rowCursor{
rows: rows,
columns: columns,
}
}
type rowCursor struct {
rows []Row
columns []influxql.VarRef
series Series
}
func (cur *rowCursor) Scan(row *Row) bool {
if len(cur.rows) == 0 {
return false
}
*row = cur.rows[0]
if row.Series.Name != cur.series.Name || !row.Series.Tags.Equals(&cur.series.Tags) {
cur.series.Name = row.Series.Name
cur.series.Tags = row.Series.Tags
cur.series.id++
}
cur.rows = cur.rows[1:]
return true
}
func (cur *rowCursor) Stats() IteratorStats {
return IteratorStats{}
}
func (cur *rowCursor) Err() error {
return nil
}
func (cur *rowCursor) Columns() []influxql.VarRef {
return cur.columns
}
func (cur *rowCursor) Close() error {
return nil
}
type scannerFunc func(m map[string]interface{}) (int64, string, Tags)
type scannerCursorBase struct {
fields []influxql.Expr
m map[string]interface{}
series Series
columns []influxql.VarRef
loc *time.Location
scan scannerFunc
valuer influxql.ValuerEval
}
func newScannerCursorBase(scan scannerFunc, fields []*influxql.Field, loc *time.Location) scannerCursorBase {
typmap := FunctionTypeMapper{}
exprs := make([]influxql.Expr, len(fields))
columns := make([]influxql.VarRef, len(fields))
for i, f := range fields {
exprs[i] = f.Expr
columns[i] = influxql.VarRef{
Val: f.Name(),
Type: influxql.EvalType(f.Expr, nil, typmap),
}
}
if loc == nil {
loc = time.UTC
}
m := make(map[string]interface{})
return scannerCursorBase{
fields: exprs,
m: m,
columns: columns,
loc: loc,
scan: scan,
valuer: influxql.ValuerEval{
Valuer: influxql.MultiValuer(
MathValuer{},
influxql.MapValuer(m),
),
IntegerFloatDivision: true,
},
}
}
func (cur *scannerCursorBase) Scan(row *Row) bool {
ts, name, tags := cur.scan(cur.m)
if ts == ZeroTime {
return false
}
row.Time = ts
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
cur.series.Name = name
cur.series.Tags = tags
cur.series.id++
}
row.Series = cur.series
if len(cur.columns) > len(row.Values) {
row.Values = make([]interface{}, len(cur.columns))
}
for i, expr := range cur.fields {
// A special case if the field is time to reduce memory allocations.
if ref, ok := expr.(*influxql.VarRef); ok && ref.Val == "time" {
row.Values[i] = time.Unix(0, row.Time).In(cur.loc)
continue
}
v := cur.valuer.Eval(expr)
if fv, ok := v.(float64); ok && math.IsNaN(fv) {
// If the float value is NaN, convert it to a null float
// so this can be serialized correctly, but not mistaken for
// a null value that needs to be filled.
v = NullFloat
}
row.Values[i] = v
}
return true
}
func (cur *scannerCursorBase) Columns() []influxql.VarRef {
return cur.columns
}
func (cur *scannerCursorBase) clear(m map[string]interface{}) {
for k := range m {
delete(m, k)
}
}
var _ Cursor = (*scannerCursor)(nil)
type scannerCursor struct {
scanner IteratorScanner
scannerCursorBase
}
func newScannerCursor(s IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *scannerCursor {
cur := &scannerCursor{scanner: s}
cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location)
return cur
}
func (s *scannerCursor) scan(m map[string]interface{}) (int64, string, Tags) {
ts, name, tags := s.scanner.Peek()
// if a new series, clear the map of previous values
if name != s.series.Name || tags.ID() != s.series.Tags.ID() {
s.clear(m)
}
if ts == ZeroTime {
return ts, name, tags
}
s.scanner.ScanAt(ts, name, tags, m)
return ts, name, tags
}
func (cur *scannerCursor) Stats() IteratorStats {
return cur.scanner.Stats()
}
func (cur *scannerCursor) Err() error {
return cur.scanner.Err()
}
func (cur *scannerCursor) Close() error {
return cur.scanner.Close()
}
var _ Cursor = (*multiScannerCursor)(nil)
type multiScannerCursor struct {
scanners []IteratorScanner
err error
ascending bool
scannerCursorBase
}
func newMultiScannerCursor(scanners []IteratorScanner, fields []*influxql.Field, opt IteratorOptions) *multiScannerCursor {
cur := &multiScannerCursor{
scanners: scanners,
ascending: opt.Ascending,
}
cur.scannerCursorBase = newScannerCursorBase(cur.scan, fields, opt.Location)
return cur
}
func (cur *multiScannerCursor) scan(m map[string]interface{}) (ts int64, name string, tags Tags) {
ts = ZeroTime
for _, s := range cur.scanners {
curTime, curName, curTags := s.Peek()
if curTime == ZeroTime {
if err := s.Err(); err != nil {
cur.err = err
return ZeroTime, "", Tags{}
}
continue
}
if ts == ZeroTime {
ts, name, tags = curTime, curName, curTags
continue
}
if cur.ascending {
if (curName < name) || (curName == name && curTags.ID() < tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime < ts) {
ts, name, tags = curTime, curName, curTags
}
continue
}
if (curName > name) || (curName == name && curTags.ID() > tags.ID()) || (curName == name && curTags.ID() == tags.ID() && curTime > ts) {
ts, name, tags = curTime, curName, curTags
}
}
if ts == ZeroTime {
return ts, name, tags
}
// if a new series, clear the map of previous values
if name != cur.series.Name || tags.ID() != cur.series.Tags.ID() {
cur.clear(m)
}
for _, s := range cur.scanners {
s.ScanAt(ts, name, tags, m)
}
return ts, name, tags
}
func (cur *multiScannerCursor) Stats() IteratorStats {
var stats IteratorStats
for _, s := range cur.scanners {
stats.Add(s.Stats())
}
return stats
}
func (cur *multiScannerCursor) Err() error {
return cur.err
}
func (cur *multiScannerCursor) Close() error {
var err error
for _, s := range cur.scanners {
if e := s.Close(); e != nil && err == nil {
err = e
}
}
return err
}
type filterCursor struct {
Cursor
// fields holds the mapping of field names to the index in the row
// based off of the column metadata. This only contains the fields
// we need and will exclude the ones we do not.
fields map[string]IteratorMap
filter influxql.Expr
m map[string]interface{}
valuer influxql.ValuerEval
}
func newFilterCursor(cur Cursor, filter influxql.Expr) *filterCursor {
fields := make(map[string]IteratorMap)
for _, name := range influxql.ExprNames(filter) {
for i, col := range cur.Columns() {
if name.Val == col.Val {
fields[name.Val] = FieldMap{
Index: i,
Type: name.Type,
}
break
}
}
// If the field is not a column, assume it is a tag value.
// We do not know what the tag values will be, but there really
// isn't any different between NullMap and a TagMap that's pointed
// at the wrong location for the purposes described here.
if _, ok := fields[name.Val]; !ok {
fields[name.Val] = TagMap(name.Val)
}
}
m := make(map[string]interface{})
return &filterCursor{
Cursor: cur,
fields: fields,
filter: filter,
m: m,
valuer: influxql.ValuerEval{Valuer: influxql.MapValuer(m)},
}
}
func (cur *filterCursor) Scan(row *Row) bool {
for cur.Cursor.Scan(row) {
// Use the field mappings to prepare the map for the valuer.
for name, f := range cur.fields {
cur.m[name] = f.Value(row)
}
if cur.valuer.EvalBool(cur.filter) {
// Passes the filter! Return true. We no longer need to
// search for a suitable value.
return true
}
}
return false
}
type nullCursor struct {
columns []influxql.VarRef
}
func newNullCursor(fields []*influxql.Field) *nullCursor {
columns := make([]influxql.VarRef, len(fields))
for i, f := range fields {
columns[i].Val = f.Name()
}
return &nullCursor{columns: columns}
}
func (cur *nullCursor) Scan(row *Row) bool {
return false
}
func (cur *nullCursor) Stats() IteratorStats {
return IteratorStats{}
}
func (cur *nullCursor) Err() error {
return nil
}
func (cur *nullCursor) Columns() []influxql.VarRef {
return cur.columns
}
func (cur *nullCursor) Close() error {
return nil
}
// DrainCursor will read and discard all values from a Cursor and return the error
// if one happens.
func DrainCursor(cur Cursor) error {
var row Row
for cur.Scan(&row) {
// Do nothing with the result.
}
return cur.Err()
}